1 /*
2  * Copyright (C) 2009 Bas Driessen <bas.driessen@xobas.com>
3  * Copyright (C) 2009 - 2013 Vivien Malerba <malerba@gnome-db.org>
4  * Copyright (C) 2010 David King <davidk@openismus.com>
5  * Copyright (C) 2010 Jonh Wendell <jwendell@gnome.org>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Lesser General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA  02110-1301, USA.
21  */
22 
23 //#define DEBUG_NOTIFICATION
24 
25 #include <string.h>
26 #include <glib/gi18n-lib.h>
27 #include "gda-thread-wrapper.h"
28 #include <libgda/gda-mutex.h>
29 #include <gobject/gvaluecollector.h>
30 #include <libgda/gda-debug-macros.h>
31 #include <libgda/gda-value.h>
32 #include <unistd.h>
33 #include <sys/stat.h>
34 #ifdef G_OS_WIN32
35 #include <fcntl.h>
36 #include <io.h>
37 #endif
38 
39 /* this GPrivate holds a pointer to the GAsyncQueue used by the job being currently treated
40  * by the worker thread. It is used to avoid creating signal data for threads for which
41  * no job is being performed
42  */
43 GPrivate worker_thread_current_queue;
44 
45 typedef struct _ThreadData ThreadData;
46 typedef struct _Job Job;
47 typedef struct _SignalSpec SignalSpec;
48 typedef struct _Pipe Pipe;
49 
50 struct _GdaThreadWrapperPrivate {
51 	GRecMutex    rmutex;
52 	guint        next_job_id;
53 	GThread     *worker_thread;
54 	GAsyncQueue *to_worker_thread;
55 
56 	GHashTable  *threads_hash; /* key = a GThread, value = a #ThreadData pointer */
57 	GHashTable  *pipes_hash; /* key = a GThread, value = a #Pipe pointer */
58 };
59 
60 /*
61  * Threads synchronization with notifications
62  *
63  * Both Unix and Windows create a set of 2 file descriptors, the one at potision 0 for reading
64  * and the one at position 1 for writing.
65  */
66 struct _Pipe {
67 	GThread     *thread;
68 	int          fds[2]; /* [0] for reading and [1] for writing */
69 	GIOChannel  *ioc;
70 
71 	GMutex       mutex; /* locks @ref_count */
72 	guint        ref_count;
73 };
74 
75 #define pipe_lock(x) g_mutex_lock(& (((Pipe*)x)->mutex))
76 #define pipe_unlock(x) g_mutex_unlock(& (((Pipe*)x)->mutex))
77 
78 static Pipe *
pipe_ref(Pipe * p)79 pipe_ref (Pipe *p)
80 {
81 	if (p) {
82 		pipe_lock (p);
83 		p->ref_count++;
84 #ifdef DEBUG_NOTIFICATION
85 		g_print ("Pipe %p ++: %u\n", p, p->ref_count);
86 #endif
87 		pipe_unlock (p);
88 	}
89 	return p;
90 }
91 
92 static void
pipe_unref(Pipe * p)93 pipe_unref (Pipe *p)
94 {
95 	if (p) {
96 		pipe_lock (p);
97 		p->ref_count--;
98 #ifdef DEBUG_NOTIFICATION
99 		g_print ("Pipe %p --: %u\n", p, p->ref_count);
100 #endif
101 		if (p->ref_count == 0) {
102 			/* destroy @p */
103 			GMutex *m = &(p->mutex);
104 
105 			if (p->ioc)
106 				g_io_channel_unref (p->ioc);
107 #ifdef G_OS_WIN32
108 			if (p->fds[0] >= 0)
109 				_close (p->fds[0]);
110 			if (p->fds[1] >= 0)
111 				_close (p->fds[1]);
112 #else
113 			if (p->fds[0] >= 0)
114 				close (p->fds[0]);
115 			if (p->fds[1] >= 0)
116 				close (p->fds[1]);
117 #endif
118 
119 #ifdef DEBUG_NOTIFICATION
120 			g_print ("Destroyed Pipe %p\n", p);
121 #endif
122 			g_free (p);
123 
124 			g_mutex_unlock (m);
125 			g_mutex_clear (m);
126 		}
127 		else
128 			pipe_unlock (p);
129 	}
130 }
131 
132 /*
133  * May return %NULL
134  */
135 static Pipe *
pipe_new(void)136 pipe_new (void)
137 {
138 	Pipe *p;
139 
140 	p = g_new0 (Pipe, 1);
141 	g_mutex_init (&(p->mutex));
142 	p->ref_count = 1;
143 	p->thread = g_thread_self ();
144 #ifdef G_OS_WIN32
145 	if (_pipe (p->fds, 156, O_BINARY) != 0) {
146 #else
147 	if (pipe (p->fds) != 0) {
148 #endif
149 		pipe_unref (p);
150 		p = NULL;
151 		goto out;
152 	}
153 #ifdef G_OS_WIN32
154 	p->ioc = g_io_channel_win32_new_fd (p->fds [0]);
155 #else
156 	p->ioc = g_io_channel_unix_new (p->fds [0]);
157 #endif
158 
159 	/* we want raw data */
160 	if (g_io_channel_set_encoding (p->ioc, NULL, NULL) != G_IO_STATUS_NORMAL) {
161 		g_warning ("Can't set IO encoding to NULL\n");
162 		pipe_unref (p);
163 		p = NULL;
164 	}
165 
166  out:
167 #ifdef DEBUG_NOTIFICATION
168 	g_print ("Created Pipe %p\n", p);
169 #endif
170 	return p;
171 }
172 
173 static Pipe *
174 get_pipe (GdaThreadWrapper *wrapper, GThread *thread)
175 {
176 	Pipe *p = NULL;
177 	g_rec_mutex_lock (&(wrapper->priv->rmutex));
178 	if (wrapper->priv->pipes_hash)
179 		p = g_hash_table_lookup (wrapper->priv->pipes_hash, thread);
180 	g_rec_mutex_unlock (&(wrapper->priv->rmutex));
181 	return p;
182 }
183 
184 /*
185  * One instance for each job to execute (and its result) and
186  * one instance for each emitted signal
187  *
188  * Created and destroyed exclusively by the thread(s) using the GdaThreadWrapper object,
189  * except for the job where job->type == JOB_TYPE_DESTROY which is destroyed by the sub thread.
190  *
191  * Passed to the sub job through obj->to_worker_thread
192  */
193 typedef enum {
194 	JOB_TYPE_EXECUTE, /* a "real" job for the GdaThreadWrapper */
195 	JOB_TYPE_DESTROY, /* internal to signal the internal thread to shutdown */
196 	JOB_TYPE_SIGNAL, /* a signal from an object in the internal thread */
197 	JOB_TYPE_NOTIFICATION_ERROR /* internal to signal notification error and sutdown */
198 } JobType;
199 struct _Job {
200 	JobType                  type;
201 	gboolean                 processed; /* TRUE when worker thread has started to work on it */
202 	gboolean                 cancelled; /* TRUE when job has been cancelled before being executed */
203 	guint                    job_id;
204 	GdaThreadWrapperFunc     func;
205 	GdaThreadWrapperVoidFunc void_func;
206 	gpointer                 arg;
207 	GDestroyNotify           arg_destroy_func;
208 	GAsyncQueue             *reply_queue; /* holds a ref to it */
209 	Pipe                    *notif; /* if not %NULL, notification when job finished */
210 
211 	/* result part */
212 	union {
213 		struct {
214 			gpointer             result;
215 			GError              *error;
216 		} exe;
217 		struct {
218 			SignalSpec   *spec;
219 			guint         n_param_values;
220 			GValue       *param_values; /* array of GValue structures */
221 		} signal;
222 	} u;
223 };
224 #define JOB(x) ((Job*)(x))
225 static void
226 job_free (Job *job)
227 {
228 	pipe_unref (job->notif);
229 	if (job->arg && job->arg_destroy_func)
230 		job->arg_destroy_func (job->arg);
231 	if (job->reply_queue)
232 		g_async_queue_unref (job->reply_queue);
233 
234 	if (job->type == JOB_TYPE_EXECUTE) {
235 		if (job->u.exe.error)
236 			g_error_free (job->u.exe.error);
237 	}
238 	else if (job->type == JOB_TYPE_SIGNAL) {
239 		guint i;
240 		for (i = 0; i < job->u.signal.n_param_values; i++) {
241 			GValue *value = job->u.signal.param_values + i;
242 			if (G_VALUE_TYPE (value) != GDA_TYPE_NULL)
243 				g_value_reset (value);
244 		}
245 		g_free (job->u.signal.param_values);
246 	}
247 	else if (job->type == JOB_TYPE_DESTROY) {
248 		/* nothing to do here */
249 	}
250 	else if (job->type == JOB_TYPE_NOTIFICATION_ERROR) {
251 		/* nothing to do here */
252 	}
253 	else
254 		g_assert_not_reached ();
255 	g_free (job);
256 }
257 
258 /*
259  * Signal specification, created when using _connect().
260  *
261  * A SignalSpec only exists as long as the correcponding ThreadData exists.
262  */
263 struct _SignalSpec {
264         GSignalQuery  sigprop; /* must be first */
265 
266 	gboolean      private;
267 	GThread      *worker_thread;
268 	GAsyncQueue  *reply_queue; /* a ref is held here */
269 	Pipe         *notif; /* if not %NULL, notification */
270 
271         gpointer      instance;
272         gulong        signal_id;
273 
274         GdaThreadWrapperCallback callback;
275         gpointer                 data;
276 
277 	GMutex        mutex;
278 	guint         ref_count;
279 };
280 
281 #define signal_spec_lock(x) g_mutex_lock(& (((SignalSpec*)x)->mutex))
282 #define signal_spec_unlock(x) g_mutex_unlock(& (((SignalSpec*)x)->mutex))
283 
284 /*
285  * call signal_spec_lock() before calling this function
286  */
287 static void
288 signal_spec_unref (SignalSpec *sigspec)
289 {
290 	sigspec->ref_count --;
291 	if (sigspec->ref_count == 0) {
292 		signal_spec_unlock (sigspec);
293 		g_mutex_clear (&(sigspec->mutex));
294 		if (sigspec->instance && (sigspec->signal_id > 0))
295 			g_signal_handler_disconnect (sigspec->instance, sigspec->signal_id);
296 		if (sigspec->reply_queue)
297 			g_async_queue_unref (sigspec->reply_queue);
298 		pipe_unref (sigspec->notif);
299 		g_free (sigspec);
300 	}
301 	else
302 		signal_spec_unlock (sigspec);
303 }
304 
305 /*
306  * call signal_spec_unlock() after this function
307  */
308 static SignalSpec *
309 signal_spec_ref (SignalSpec *sigspec)
310 {
311 	signal_spec_lock (sigspec);
312 	sigspec->ref_count ++;
313 	return sigspec;
314 }
315 
316 /*
317  * Per thread accounting data.
318  * Each new job increases the ref count
319  */
320 struct _ThreadData {
321 	GThread     *owner;
322 	GSList      *signals_list; /* list of SignalSpec pointers, owns all the structures */
323 	GAsyncQueue *from_worker_thread; /* holds a ref to it */
324 
325 	GSList      *jobs; /* list of Job pointers not yet handled, or being handled (ie not yet poped from @from_worker_thread) */
326 	GSList      *results; /* list of Job pointers to completed jobs (ie. poped from @from_worker_thread) */
327 
328 	Pipe        *notif; /* if not %NULL, notification when any job has finished */
329 };
330 #define THREAD_DATA(x) ((ThreadData*)(x))
331 
332 static ThreadData *
333 get_thread_data (GdaThreadWrapper *wrapper, GThread *thread)
334 {
335 	ThreadData *td;
336 	g_rec_mutex_lock (&(wrapper->priv->rmutex));
337 	td = g_hash_table_lookup (wrapper->priv->threads_hash, thread);
338 	if (!td) {
339 		Pipe *p;
340 		p = get_pipe (wrapper, thread);
341 
342 		td = g_new0 (ThreadData, 1);
343 		td->owner = thread;
344 		td->from_worker_thread = g_async_queue_new_full ((GDestroyNotify) job_free);
345 		td->jobs = NULL;
346 		td->results = NULL;
347 		td->notif = pipe_ref (p);
348 		g_hash_table_insert (wrapper->priv->threads_hash, thread, td);
349 	}
350 	g_rec_mutex_unlock (&(wrapper->priv->rmutex));
351 	return td;
352 }
353 
354 static void
355 thread_data_free (ThreadData *td)
356 {
357 	pipe_unref (td->notif);
358 	g_async_queue_unref (td->from_worker_thread);
359 	td->from_worker_thread = NULL;
360 	g_assert (!td->jobs);
361 	if (td->results) {
362 		g_slist_foreach (td->results, (GFunc) job_free, NULL);
363 		g_slist_free (td->results);
364 		td->results = NULL;
365 	}
366 	if (td->signals_list) {
367 		GSList *list;
368 		for (list = td->signals_list; list; list = list->next) {
369 			/* clear SignalSpec */
370 			SignalSpec *sigspec = (SignalSpec*) list->data;
371 
372 			signal_spec_lock (sigspec);
373 			g_signal_handler_disconnect (sigspec->instance, sigspec->signal_id);
374 			sigspec->instance = NULL;
375 			sigspec->signal_id = 0;
376 			g_async_queue_unref (sigspec->reply_queue);
377 			sigspec->reply_queue = NULL;
378 			sigspec->callback = NULL;
379 			sigspec->data = NULL;
380 			signal_spec_unref (sigspec);
381 		}
382 		g_slist_free (td->signals_list);
383 	}
384 	g_free (td);
385 }
386 
387 static void gda_thread_wrapper_class_init (GdaThreadWrapperClass *klass);
388 static void gda_thread_wrapper_init       (GdaThreadWrapper *wrapper, GdaThreadWrapperClass *klass);
389 static void gda_thread_wrapper_dispose    (GObject *object);
390 static void gda_thread_wrapper_set_property (GObject *object,
391 					     guint param_id,
392 					     const GValue *value,
393 					     GParamSpec *pspec);
394 static void gda_thread_wrapper_get_property (GObject *object,
395 					     guint param_id,
396 					     GValue *value,
397 					     GParamSpec *pspec);
398 
399 /* properties */
400 enum {
401 	PROP_0
402 };
403 
404 static GObjectClass *parent_class = NULL;
405 
406 /*
407  * GdaThreadWrapper class implementation
408  * @klass:
409  */
410 static void
411 gda_thread_wrapper_class_init (GdaThreadWrapperClass *klass)
412 {
413 	GObjectClass *object_class = G_OBJECT_CLASS (klass);
414 
415 	parent_class = g_type_class_peek_parent (klass);
416 
417 	/* Properties */
418         object_class->set_property = gda_thread_wrapper_set_property;
419         object_class->get_property = gda_thread_wrapper_get_property;
420 
421 	object_class->dispose = gda_thread_wrapper_dispose;
422 }
423 
424 static void
425 clean_notifications (GdaThreadWrapper *wrapper, ThreadData *td)
426 {
427 #ifdef DEBUG_NOTIFICATION
428 	g_print ("%s(Pipe:%p)\n", __FUNCTION__, td->notif);
429 #endif
430 	GSList *list;
431 	for (list = td->signals_list; list; list = list->next) {
432 		SignalSpec *sigspec;
433 		sigspec = (SignalSpec*) list->data;
434 		signal_spec_lock (sigspec);
435 		if (sigspec->notif == td->notif) {
436 			pipe_unref (sigspec->notif);
437 			sigspec->notif = NULL;
438 		}
439 		signal_spec_unlock (sigspec);
440 	}
441 
442 	pipe_unref (td->notif);
443 	td->notif = NULL;
444 
445 	g_hash_table_remove (wrapper->priv->pipes_hash, td->owner);
446 }
447 
448 /*
449  * @wrapper: (allow-none): may be %NULL
450  * @td: (allow-none): may be %NULL
451  *
452  * Either @wrapper and @td are both NULL, or they are both NOT NULL
453  *
454  * It is assumed that pipe_ref(p) has been called before calling this function
455  */
456 static gboolean
457 write_notification (GdaThreadWrapper *wrapper, ThreadData *td,
458 		    Pipe *p, GdaThreadNotificationType type, guint job_id)
459 {
460 	g_assert ((wrapper && td) || (!wrapper && !td));
461 
462 	if (!p || (p->fds[1] < 0)) {
463 		pipe_unref (p);
464 		return TRUE;
465 	}
466 #ifdef DEBUG_NOTIFICATION_FORCE
467 	static guint c = 0;
468 	c++;
469 	if (c == 4)
470 		goto onerror;
471 #endif
472 
473 	GdaThreadNotification notif;
474 	ssize_t nw;
475 	notif.type = type;
476 	notif.job_id = job_id;
477 #ifdef G_OS_WIN32
478 	nw = _write (p->fds[1], &notif, sizeof (notif));
479 #else
480 	nw = write (p->fds[1], &notif, sizeof (notif));
481 #endif
482 	if (nw != sizeof (notif)) {
483 		/* Error */
484 		goto onerror;
485 	}
486 #ifdef DEBUG_NOTIFICATION
487 	g_print ("Wrote notification %d.%u to pipe %p\n", type, job_id, p);
488 #endif
489 	pipe_unref (p);
490 	return TRUE;
491 
492  onerror:
493 #ifdef DEBUG_NOTIFICATION
494 	g_print ("%s(): returned FALSE\n", __FUNCTION__);
495 	g_print ("Closed FD %d\n", p->fds [1]);
496 #endif
497 	/* close the writing end of the pipe */
498 #ifdef G_OS_WIN32
499 	_close (p->fds [1]);
500 #else
501 	close (p->fds [1]);
502 #endif
503 	p->fds [1] = -1;
504 	pipe_unref (p);
505 	if (td)
506 		clean_notifications (wrapper, td);
507 	return FALSE;
508 }
509 
510 /*
511  * Executed in the sub thread:
512  * takes a Job in (from the wrapper->priv->to_worker_thread queue) and creates a new Result which
513  * it pushed to Job->reply_queue
514  */
515 static gpointer
516 worker_thread_entry_point (GAsyncQueue *to_worker_thread)
517 {
518 	GAsyncQueue *in;
519 
520 	in = to_worker_thread;
521 
522 	while (1) {
523 		Job *job;
524 
525 		/* pop next job and mark it as being processed */
526 		g_private_set (&worker_thread_current_queue, NULL);
527 		g_async_queue_lock (in);
528 		job = g_async_queue_pop_unlocked (in);
529 		job->processed = TRUE;
530 		g_private_set (&worker_thread_current_queue, job->reply_queue);
531 		g_async_queue_unlock (in);
532 
533 		if (job->cancelled) {
534 			job_free (job);
535 			continue;
536 		}
537 
538 		if (job->type == JOB_TYPE_DESTROY) {
539 			g_assert (! job->arg_destroy_func);
540 			job_free (job);
541 #ifdef THREAD_WRAPPER_DEBUG
542 			g_print ("... exit sub thread %p for wrapper\n", g_thread_self ());
543 #endif
544 
545 			/* exit sub thread */
546 			break;
547 		}
548 		else if (job->type == JOB_TYPE_EXECUTE) {
549 			if (job->func)
550 				job->u.exe.result = job->func (job->arg, &(job->u.exe.error));
551 			else {
552 				job->u.exe.result = NULL;
553 				job->void_func (job->arg, &(job->u.exe.error));
554 			}
555 
556 			guint jid = job->job_id;
557 			Pipe *jpipe = pipe_ref (job->notif);
558 			g_async_queue_push (job->reply_queue, job);
559 			if (! write_notification (NULL, NULL, jpipe, GDA_THREAD_NOTIFICATION_JOB, jid)) {
560 				Job *je = g_new0 (Job, 1);
561 				je->type = JOB_TYPE_NOTIFICATION_ERROR;
562 				g_async_queue_push (job->reply_queue, je);
563 			}
564 		}
565 		else
566 			g_assert_not_reached ();
567 	}
568 
569 	g_async_queue_unref (in);
570 
571 	return NULL;
572 }
573 
574 static void
575 gda_thread_wrapper_init (GdaThreadWrapper *wrapper, G_GNUC_UNUSED GdaThreadWrapperClass *klass)
576 {
577 	g_return_if_fail (GDA_IS_THREAD_WRAPPER (wrapper));
578 
579 	wrapper->priv = g_new0 (GdaThreadWrapperPrivate, 1);
580 	g_rec_mutex_init (&(wrapper->priv->rmutex));
581 	wrapper->priv->next_job_id = 1;
582 
583 	wrapper->priv->threads_hash = g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify) thread_data_free);
584 
585 	wrapper->priv->to_worker_thread = g_async_queue_new ();
586 	wrapper->priv->worker_thread = g_thread_new ("worker",
587 						     (GThreadFunc) worker_thread_entry_point,
588 						     g_async_queue_ref (wrapper->priv->to_worker_thread)); /* inc. ref for sub thread usage */
589 
590 	wrapper->priv->pipes_hash = NULL;
591 
592 #ifdef THREAD_WRAPPER_DEBUG
593 	g_print ("... new wrapper %p, worker_thread=%p\n", wrapper, wrapper->priv->worker_thread);
594 #endif
595 }
596 
597 static gboolean
598 thread_data_remove_jobs_func (G_GNUC_UNUSED GThread *key, ThreadData *td, G_GNUC_UNUSED gpointer data)
599 {
600 	if (td->jobs)  {
601 		GSList *list;
602 		for (list = td->jobs; list; list = list->next) {
603 			Job *job = JOB (list->data);
604 			if (job->processed) {
605 				/* we can't free that job because it is probably being used by the
606 				 * worker thread, so just emit a warning
607 				 */
608 				if (job->arg_destroy_func) {
609 					g_warning ("The argument of Job ID %d will be destroyed by sub thread",
610 						   job->job_id);
611 				}
612 			}
613 			else {
614 				/* cancel this job */
615 				job->cancelled = TRUE;
616 				if (job->arg && job->arg_destroy_func) {
617 					job->arg_destroy_func (job->arg);
618 					job->arg = NULL;
619 				}
620 			}
621 		}
622 		g_slist_free (td->jobs);
623 		td->jobs = NULL;
624 	}
625 	return TRUE; /* remove this ThreadData */
626 }
627 
628 static void
629 gda_thread_wrapper_dispose (GObject *object)
630 {
631 	GdaThreadWrapper *wrapper = (GdaThreadWrapper *) object;
632 
633 	g_return_if_fail (GDA_IS_THREAD_WRAPPER (wrapper));
634 
635 	if (wrapper->priv) {
636 		Job *job = g_new0 (Job, 1);
637 		job->type = JOB_TYPE_DESTROY;
638 		job->notif = NULL;
639 		g_async_queue_push (wrapper->priv->to_worker_thread, job);
640 #ifdef THREAD_WRAPPER_DEBUG
641 		g_print ("... pushed JOB_TYPE_DESTROY for wrapper %p\n", wrapper);
642 #endif
643 
644 		g_async_queue_lock (wrapper->priv->to_worker_thread);
645 		if (wrapper->priv->threads_hash) {
646 			g_hash_table_foreach_remove (wrapper->priv->threads_hash,
647 						     (GHRFunc) thread_data_remove_jobs_func, NULL);
648 			g_hash_table_destroy (wrapper->priv->threads_hash);
649 		}
650 		g_async_queue_unlock (wrapper->priv->to_worker_thread);
651 		g_async_queue_unref (wrapper->priv->to_worker_thread);
652 		wrapper->priv->worker_thread = NULL; /* side note: don't wait for sub thread to terminate */
653 
654 		g_rec_mutex_clear (&(wrapper->priv->rmutex));
655 
656 		if (wrapper->priv->pipes_hash)
657 			g_hash_table_destroy (wrapper->priv->pipes_hash);
658 
659 		g_free (wrapper->priv);
660 		wrapper->priv = NULL;
661 	}
662 
663 	/* chain to parent class */
664 	parent_class->dispose (object);
665 }
666 
667 /* module error */
668 GQuark gda_thread_wrapper_error_quark (void)
669 {
670         static GQuark quark;
671         if (!quark)
672                 quark = g_quark_from_static_string ("gda_thread_wrapper_error");
673         return quark;
674 }
675 
676 /**
677  * gda_thread_wrapper_get_type:
678  *
679  * Registers the #GdaThreadWrapper class on the GLib type system.
680  *
681  * Returns: the GType identifying the class.
682  */
683 GType
684 gda_thread_wrapper_get_type (void)
685 {
686         static GType type = 0;
687 
688         if (G_UNLIKELY (type == 0)) {
689                 static GMutex registering;
690                 static const GTypeInfo info = {
691                         sizeof (GdaThreadWrapperClass),
692                         (GBaseInitFunc) NULL,
693                         (GBaseFinalizeFunc) NULL,
694                         (GClassInitFunc) gda_thread_wrapper_class_init,
695                         NULL,
696                         NULL,
697                         sizeof (GdaThreadWrapper),
698                         0,
699                         (GInstanceInitFunc) gda_thread_wrapper_init,
700 			0
701                 };
702 
703                 g_mutex_lock (&registering);
704                 if (type == 0)
705                         type = g_type_register_static (G_TYPE_OBJECT, "GdaThreadWrapper", &info, 0);
706                 g_mutex_unlock (&registering);
707         }
708         return type;
709 }
710 
711 static void
712 gda_thread_wrapper_set_property (GObject *object,
713 			       guint param_id,
714 			       G_GNUC_UNUSED const GValue *value,
715 			       GParamSpec *pspec)
716 {
717 	GdaThreadWrapper *wrapper;
718 
719         wrapper = GDA_THREAD_WRAPPER (object);
720         if (wrapper->priv) {
721                 switch (param_id) {
722 		default:
723 			G_OBJECT_WARN_INVALID_PROPERTY_ID (object, param_id, pspec);
724 			break;
725 		}
726 	}
727 }
728 
729 static void
730 gda_thread_wrapper_get_property (GObject *object,
731 			       guint param_id,
732 			       G_GNUC_UNUSED GValue *value,
733 			       GParamSpec *pspec)
734 {
735 	GdaThreadWrapper *wrapper;
736 
737 	wrapper = GDA_THREAD_WRAPPER (object);
738 	if (wrapper->priv) {
739 		switch (param_id) {
740 		default:
741 			G_OBJECT_WARN_INVALID_PROPERTY_ID (object, param_id, pspec);
742 			break;
743 		}
744 	}
745 }
746 
747 /**
748  * gda_thread_wrapper_new:
749  *
750  * Creates a new #GdaThreadWrapper object
751  *
752  * Returns: a new #GdaThreadWrapper object, or %NULL if threads are not supported/enabled
753  *
754  * Since: 4.2
755  */
756 GdaThreadWrapper *
757 gda_thread_wrapper_new (void)
758 {
759 	return (GdaThreadWrapper *) g_object_new (GDA_TYPE_THREAD_WRAPPER, NULL);
760 }
761 
762 /**
763  * gda_thread_wrapper_get_io_channel:
764  * @wrapper: a #GdaThreadWrapper object
765  *
766  * Allow @wrapper to notify when an execution job is finished, by making its exec ID
767  * readable through a new #GIOChannel. This function is useful when the notification needs
768  * to be included into a main loop. This also notifies that signals (emitted by objects in
769  * @wrapper's internal thread) are available.
770  *
771  * The returned #GIOChannel will have something to read everytime an execution job is finished
772  * for an execution job submitted from the calling thread. The user whould read #GdaThreadNotification
773  * structures from the channel and analyse its contents to call gda_thread_wrapper_iterate()
774  * or gda_thread_wrapper_fetch_result().
775  *
776  * Note1: the new communication channel will only be operational for jobs submitted after this
777  * function returns, and for signals which have been connected after this function returns. A safe
778  * practice is to call this function before the @wrapper object has been used.
779  *
780  * Note2: this function will return the same #GIOChannel everytime it's called from the same thread.
781  *
782  * Note3: if the usage of the returned #GIOChannel reveals an error, then g_io_channel_shutdown() and
783  * g_io_channel_unref() should be called on the #GIOChannel to let @wrapper know it should not use
784  * that object anymore.
785  *
786  * Returns: (transfer none): a new #GIOChannel, or %NULL if it could not be created
787  *
788  * Since: 4.2.9
789  */
790 GIOChannel *
791 gda_thread_wrapper_get_io_channel (GdaThreadWrapper *wrapper)
792 {
793 	Pipe *p;
794 	GThread *th;
795 	g_return_val_if_fail (GDA_IS_THREAD_WRAPPER (wrapper), NULL);
796 	g_return_val_if_fail (wrapper->priv, NULL);
797 
798 	th = g_thread_self ();
799 	g_rec_mutex_lock (&(wrapper->priv->rmutex));
800 	p = get_pipe (wrapper, th);
801 	if (!p) {
802 		p = pipe_new ();
803 		if (p) {
804 			if (! wrapper->priv->pipes_hash)
805 				wrapper->priv->pipes_hash = g_hash_table_new_full (g_direct_hash,
806 										   g_direct_equal,
807 										   NULL,
808 										   (GDestroyNotify) pipe_unref);
809 			g_hash_table_insert (wrapper->priv->pipes_hash, th, p);
810 		}
811 	}
812 	g_rec_mutex_unlock (&(wrapper->priv->rmutex));
813 	if (p)
814 		return p->ioc;
815 	else
816 		return NULL;
817 }
818 
819 /**
820  * gda_thread_wrapper_unset_io_channel:
821  * @wrapper: a #GdaThreadWrapper
822  *
823  * Does the opposite of gda_thread_wrapper_get_io_channel()
824  *
825  * Since: 4.2.9
826  */
827 void
828 gda_thread_wrapper_unset_io_channel (GdaThreadWrapper *wrapper)
829 {
830 	GThread *th;
831 	ThreadData *td;
832 
833 	g_return_if_fail (GDA_IS_THREAD_WRAPPER (wrapper));
834 	g_return_if_fail (wrapper->priv);
835 
836 	g_rec_mutex_lock (&(wrapper->priv->rmutex));
837 	th = g_thread_self ();
838 	td = g_hash_table_lookup (wrapper->priv->threads_hash, th);
839 	if (td) {
840 		Pipe *p;
841 		p = get_pipe (wrapper, th);
842 		if (p)
843 			clean_notifications (wrapper, td);
844 	}
845 	g_rec_mutex_unlock (&(wrapper->priv->rmutex));
846 }
847 
848 /**
849  * gda_thread_wrapper_execute:
850  * @wrapper: a #GdaThreadWrapper object
851  * @func: the function to execute, not %NULL
852  * @arg: (allow-none): argument to pass to @func, or %NULL
853  * @arg_destroy_func: (allow-none): function to be called when the execution has finished, to destroy @arg, or %NULL
854  * @error: a place to store errors, for errors occurring in this method, not errors occurring while @func
855  *         is executed, or %NULL
856  *
857  * Make @wrapper execute the @func function with the @arg argument (along with a #GError which is not @error)
858  * in the sub thread managed by @wrapper. To execute a function which does not return anything,
859  * use gda_thread_wrapper_execute_void().
860  *
861  * This method returns immediately, and the caller then needs to use gda_thread_wrapper_fetch_result() to
862  * check if the execution has finished and get the result.
863  *
864  * Once @func's execution is finished, if @arg is not %NULL, the @arg_destroy_func destruction function is called
865  * on @arg. This call occurs in the thread calling gda_thread_wrapper_fetch_result().
866  *
867  * If an error occurred in this function, then the @arg_destroy_func function is not called to free @arg.
868  *
869  * Returns: the job ID, or 0 if an error occurred
870  *
871  * Since: 4.2
872  */
873 guint
874 gda_thread_wrapper_execute (GdaThreadWrapper *wrapper, GdaThreadWrapperFunc func,
875 			    gpointer arg, GDestroyNotify arg_destroy_func, G_GNUC_UNUSED GError **error)
876 {
877 	Job *job;
878 	guint id;
879 	ThreadData *td;
880 
881 	g_return_val_if_fail (GDA_IS_THREAD_WRAPPER (wrapper), 0);
882 	g_return_val_if_fail (wrapper->priv, 0);
883 	g_return_val_if_fail (func, 0);
884 
885 	td = get_thread_data (wrapper, g_thread_self());
886 
887 	job = g_new0 (Job, 1);
888 	job->type = JOB_TYPE_EXECUTE;
889 	job->processed = FALSE;
890 	job->cancelled = FALSE;
891 	g_rec_mutex_lock (&(wrapper->priv->rmutex));
892 	job->job_id = wrapper->priv->next_job_id++;
893 	g_rec_mutex_unlock (&(wrapper->priv->rmutex));
894 	job->func = func;
895 	job->void_func = NULL;
896 	job->arg = arg;
897 	job->arg_destroy_func = arg_destroy_func;
898 	job->reply_queue = g_async_queue_ref (td->from_worker_thread);
899 	job->notif = pipe_ref (td->notif);
900 
901 	id = job->job_id;
902 #ifdef THREAD_WRAPPER_DEBUG
903 	g_print ("... submitted job %d for wrapper %p from thread %p\n", id, wrapper, g_thread_self());
904 #endif
905 
906 	td->jobs = g_slist_append (td->jobs, job);
907 
908 	if (g_thread_self () == wrapper->priv->worker_thread) {
909 		job->processed = TRUE;
910                 if (job->func)
911                         job->u.exe.result = job->func (job->arg, &(job->u.exe.error));
912                 else {
913                         job->u.exe.result = NULL;
914                         job->void_func (job->arg, &(job->u.exe.error));
915                 }
916 #ifdef THREAD_WRAPPER_DEBUG
917 		g_print ("... IMMEDIATELY done job %d => %p\n", job->job_id, job->u.exe.result);
918 #endif
919 		guint jid = job->job_id;
920 		Pipe *jpipe = pipe_ref (job->notif);
921                 g_async_queue_push (job->reply_queue, job);
922 		write_notification (wrapper, td, jpipe, GDA_THREAD_NOTIFICATION_JOB, jid);
923         }
924         else
925                 g_async_queue_push (wrapper->priv->to_worker_thread, job);
926 
927 	return id;
928 }
929 
930 /**
931  * gda_thread_wrapper_execute_void:
932  * @wrapper: a #GdaThreadWrapper object
933  * @func: the function to execute, not %NULL
934  * @arg: (allow-none): argument to pass to @func
935  * @arg_destroy_func: (allow-none): function to be called when the execution has finished, to destroy @arg, or %NULL
936  * @error: a place to store errors, for errors occurring in this method, not errors occurring while @func
937  *         is executed, or %NULL
938  *
939  * Make @wrapper execute the @func function with the @arg argument (along with a #GError which is not @error)
940  * in the sub thread managed by @wrapper. To execute a function which returns some pointer,
941  * use gda_thread_wrapper_execute().
942  *
943  * This method returns immediately. Calling gda_thread_wrapper_fetch_result() is not necessary as @func
944  * does not return any result. However, it may be necessary to call gda_thread_wrapper_iterate() to give @wrapper a
945  * chance to execute the @arg_destroy_func function if not %NULL (note that gda_thread_wrapper_iterate() is
946  * called by gda_thread_wrapper_fetch_result() itself).
947  *
948  * Once @func's execution is finished, if @arg is not %NULL, the @arg_destroy_func destruction function is called
949  * on @arg. This call occurs in the thread calling gda_thread_wrapper_fetch_result().
950  *
951  * If an error occurred in this function, then the @arg_destroy_func function is not called to free @arg.
952  *
953  * Returns: the job ID, or 0 if an error occurred
954  *
955  * Since: 4.2
956  */
957 guint
958 gda_thread_wrapper_execute_void (GdaThreadWrapper *wrapper, GdaThreadWrapperVoidFunc func,
959 				 gpointer arg, GDestroyNotify arg_destroy_func, G_GNUC_UNUSED GError **error)
960 {
961 	Job *job;
962 	guint id;
963 	ThreadData *td;
964 
965 	g_return_val_if_fail (GDA_IS_THREAD_WRAPPER (wrapper), 0);
966 	g_return_val_if_fail (wrapper->priv, 0);
967 	g_return_val_if_fail (func, 0);
968 
969 	td = get_thread_data (wrapper, g_thread_self());
970 
971 	job = g_new0 (Job, 1);
972 	job->type = JOB_TYPE_EXECUTE;
973 	job->processed = FALSE;
974 	job->cancelled = FALSE;
975 	g_rec_mutex_lock (&(wrapper->priv->rmutex));
976 	job->job_id = wrapper->priv->next_job_id++;
977 	g_rec_mutex_unlock (&(wrapper->priv->rmutex));
978 	job->func = NULL;
979 	job->void_func = func;
980 	job->arg = arg;
981 	job->arg_destroy_func = arg_destroy_func;
982 	job->reply_queue = g_async_queue_ref (td->from_worker_thread);
983 	job->notif = pipe_ref (td->notif);
984 
985 	id = job->job_id;
986 #ifdef THREAD_WRAPPER_DEBUG
987 	g_print ("... submitted VOID job %d\n", id);
988 #endif
989 
990 	td->jobs = g_slist_append (td->jobs, job);
991 
992 	if (g_thread_self () == wrapper->priv->worker_thread) {
993 		job->processed = TRUE;
994                 if (job->func)
995                         job->u.exe.result = job->func (job->arg, &(job->u.exe.error));
996                 else {
997                         job->u.exe.result = NULL;
998                         job->void_func (job->arg, &(job->u.exe.error));
999                 }
1000 #ifdef THREAD_WRAPPER_DEBUG
1001 		g_print ("... IMMEDIATELY done VOID job %d => %p\n", job->job_id, job->u.exe.result);
1002 #endif
1003 		guint jid = job->job_id;
1004 		Pipe *jpipe = pipe_ref (job->notif);
1005                 g_async_queue_push (job->reply_queue, job);
1006 		write_notification (wrapper, td, jpipe, GDA_THREAD_NOTIFICATION_JOB, jid);
1007         }
1008         else
1009 		g_async_queue_push (wrapper->priv->to_worker_thread, job);
1010 
1011 	return id;
1012 }
1013 
1014 /**
1015  * gda_thread_wrapper_cancel:
1016  * @wrapper: a #GdaThreadWrapper object
1017  * @id: the ID of a job as returned by gda_thread_wrapper_execute() or gda_thread_wrapper_execute_void()
1018  *
1019  * Cancels a job not yet executed. This may fail for the following reasons:
1020  * <itemizedlist>
1021  *  <listitem><para>the job @id could not be found, either because it has already been treated or because
1022  *                  it does not exist or because it was created in another thread</para></listitem>
1023  *  <listitem><para>the job @id is currently being treated by the worker thread</para></listitem>
1024  * </itemizedlist>
1025  *
1026  * Returns: %TRUE if the job has been cancelled, or %FALSE in any other case.
1027  *
1028  * Since: 4.2
1029  */
1030 gboolean
1031 gda_thread_wrapper_cancel (GdaThreadWrapper *wrapper, guint id)
1032 {
1033 	GSList *list;
1034 	gboolean retval = FALSE; /* default if job not found */
1035 	ThreadData *td;
1036 
1037 	g_return_val_if_fail (GDA_IS_THREAD_WRAPPER (wrapper), FALSE);
1038 
1039 	g_rec_mutex_lock (&(wrapper->priv->rmutex));
1040 
1041 	td = g_hash_table_lookup (wrapper->priv->threads_hash, g_thread_self());
1042 	if (!td) {
1043 		/* nothing to be done for this thread */
1044 		g_rec_mutex_unlock (&(wrapper->priv->rmutex));
1045 		return FALSE;
1046 	}
1047 
1048 	g_async_queue_lock (wrapper->priv->to_worker_thread);
1049 	for (list = td->jobs; list; list = list->next) {
1050 		Job *job = JOB (list->data);
1051 		if (job->job_id == id) {
1052 			if (job->processed) {
1053 				/* can't cancel it as it's being treated */
1054 				break;
1055 			}
1056 
1057 			retval = TRUE;
1058 			job->cancelled = TRUE;
1059 			if (job->arg && job->arg_destroy_func) {
1060 				job->arg_destroy_func (job->arg);
1061 				job->arg = NULL;
1062 			}
1063 			td->jobs = g_slist_delete_link (td->jobs, list);
1064 			break;
1065 		}
1066 	}
1067 	g_async_queue_unlock (wrapper->priv->to_worker_thread);
1068 	g_rec_mutex_unlock (&(wrapper->priv->rmutex));
1069 
1070 	return retval;
1071 }
1072 
1073 /**
1074  * gda_thread_wrapper_iterate:
1075  * @wrapper: a #GdaThreadWrapper object
1076  * @may_block: whether the call may block
1077  *
1078  * This method gives @wrapper a chance to check if some functions to be executed have finished
1079  * <emphasis>for the calling thread</emphasis>. In this case it handles the execution result and
1080  * makes it ready to be processed using gda_thread_wrapper_fetch_result().
1081  *
1082  * This method also allows @wrapper to handle signals which may have been emitted by objects
1083  * while in the worker thread, and call the callback function specified when gda_thread_wrapper_connect_raw()
1084  * was used.
1085  *
1086  * If @may_block is %TRUE, then it will block untill there is one finished execution
1087  * (functions returning void and signals are ignored regarding this argument).
1088  *
1089  * Since: 4.2
1090  */
1091 void
1092 gda_thread_wrapper_iterate (GdaThreadWrapper *wrapper, gboolean may_block)
1093 {
1094 	ThreadData *td;
1095 	Job *job;
1096 
1097 	g_return_if_fail (GDA_IS_THREAD_WRAPPER (wrapper));
1098 	g_return_if_fail (wrapper->priv);
1099 
1100 	g_rec_mutex_lock (&(wrapper->priv->rmutex));
1101 	td = g_hash_table_lookup (wrapper->priv->threads_hash, g_thread_self());
1102 	g_rec_mutex_unlock (&(wrapper->priv->rmutex));
1103 	if (!td) {
1104 		/* nothing to be done for this thread */
1105 		return;
1106 	}
1107 
1108  again:
1109 	if (may_block)
1110 		job = g_async_queue_pop (td->from_worker_thread);
1111 	else
1112 		job = g_async_queue_try_pop (td->from_worker_thread);
1113 	if (job) {
1114 		gboolean do_again = FALSE;
1115 		td->jobs = g_slist_remove (td->jobs, job);
1116 #ifdef THREAD_WRAPPER_DEBUG
1117 		g_print ("Popped job %d (type %d), for wrapper %p from thread %p\n",
1118 			 job->job_id, job->type, wrapper, g_thread_self ());
1119 #endif
1120 
1121 		if (job->type == JOB_TYPE_EXECUTE) {
1122 			if (!job->func) {
1123 				job_free (job); /* ignore as there is no result */
1124 				do_again = TRUE;
1125 			}
1126 			else
1127 				td->results = g_slist_append (td->results, job);
1128 		}
1129 		else if (job->type == JOB_TYPE_SIGNAL) {
1130 			/* run callback now */
1131 			SignalSpec *spec = job->u.signal.spec;
1132 
1133 			if (spec->callback)
1134 				spec->callback (wrapper, spec->instance, ((GSignalQuery*)spec)->signal_name,
1135 						job->u.signal.n_param_values, job->u.signal.param_values, NULL,
1136 						spec->data);
1137 #ifdef THREAD_WRAPPER_DEBUG
1138 			else
1139 				g_print ("Not propagating signal %s\n", ((GSignalQuery*)spec)->signal_name);
1140 #endif
1141 			job->u.signal.spec = NULL;
1142 			job_free (job);
1143 			signal_spec_lock (spec);
1144 			signal_spec_unref (spec);
1145 			do_again = TRUE;
1146 		}
1147 		else if (job->type == JOB_TYPE_NOTIFICATION_ERROR) {
1148 			job_free (job);
1149 			clean_notifications (wrapper, td);
1150 		}
1151 		else
1152 			g_assert_not_reached ();
1153 
1154 		if (do_again)
1155 			goto again;
1156 	}
1157 }
1158 
1159 /**
1160  * gda_thread_wrapper_fetch_result:
1161  * @wrapper: a #GdaThreadWrapper object
1162  * @may_lock: TRUE if this funct must lock the caller untill a result is available
1163  * @exp_id: ID of the job for which a result is expected
1164  * @error: a place to store errors, for errors which may have occurred during the execution, or %NULL
1165  *
1166  * Use this method to check if the execution of a function is finished. The function's execution must have
1167  * been requested using gda_thread_wrapper_execute().
1168  *
1169  * Returns: (transfer none) (allow-none): the pointer returned by the execution, or %NULL if no result is available
1170  *
1171  * Since: 4.2
1172  */
1173 gpointer
1174 gda_thread_wrapper_fetch_result (GdaThreadWrapper *wrapper, gboolean may_lock, guint exp_id, GError **error)
1175 {
1176 	ThreadData *td;
1177 	Job *job = NULL;
1178 	gpointer retval = NULL;
1179 
1180 	g_return_val_if_fail (GDA_IS_THREAD_WRAPPER (wrapper), NULL);
1181 	g_return_val_if_fail (wrapper->priv, NULL);
1182 	g_return_val_if_fail (exp_id > 0, NULL);
1183 
1184 	g_rec_mutex_lock (&(wrapper->priv->rmutex));
1185 	td = g_hash_table_lookup (wrapper->priv->threads_hash, g_thread_self());
1186 	g_rec_mutex_unlock (&(wrapper->priv->rmutex));
1187 	if (!td) {
1188 		/* nothing to be done for this thread */
1189 		return NULL;
1190 	}
1191 
1192 	do {
1193 		if (td->results) {
1194 			/* see if we have the result we want */
1195 			GSList *list;
1196 			for (list = td->results; list; list = list->next) {
1197 				job = JOB (list->data);
1198 				if (job->job_id == exp_id) {
1199 					/* found it */
1200 					td->results = g_slist_delete_link (td->results, list);
1201 					if (!td->results &&
1202 					    !td->jobs &&
1203 					    (g_async_queue_length (td->from_worker_thread) == 0) &&
1204 					    !td->signals_list) {
1205 						/* remove this ThreadData */
1206 						g_rec_mutex_lock (&(wrapper->priv->rmutex));
1207 						g_hash_table_remove (wrapper->priv->threads_hash, g_thread_self());
1208 						g_rec_mutex_unlock (&(wrapper->priv->rmutex));
1209 					}
1210 					goto out;
1211 				}
1212 			}
1213 		}
1214 
1215 		if (may_lock)
1216 			gda_thread_wrapper_iterate (wrapper, TRUE);
1217 		else {
1218 			gsize len;
1219 			len = g_slist_length (td->results);
1220 			gda_thread_wrapper_iterate (wrapper, FALSE);
1221 			if (g_slist_length (td->results) == len) {
1222 				job = NULL;
1223 				break;
1224 			}
1225 		}
1226 	} while (1);
1227 
1228  out:
1229 	if (job) {
1230 		g_assert (job->type == JOB_TYPE_EXECUTE);
1231 		if (job->u.exe.error) {
1232 			g_propagate_error (error, job->u.exe.error);
1233 			job->u.exe.error = NULL;
1234 		}
1235 		retval = job->u.exe.result;
1236 		job->u.exe.result = NULL;
1237 		job_free (job);
1238 	}
1239 
1240 	return retval;
1241 }
1242 
1243 /**
1244  * gda_thread_wrapper_get_waiting_size:
1245  * @wrapper: a #GdaThreadWrapper object
1246  *
1247  * Use this method to query the number of functions which have been queued to be executed
1248  * but which have not yet been executed.
1249  *
1250  * Returns: the number of jobs not yet executed
1251  *
1252  * Since: 4.2
1253  */
1254 gint
1255 gda_thread_wrapper_get_waiting_size (GdaThreadWrapper *wrapper)
1256 {
1257 	GSList *list;
1258 	gint size = 0;
1259 	ThreadData *td;
1260 
1261 	g_return_val_if_fail (GDA_IS_THREAD_WRAPPER (wrapper), 0);
1262 	g_return_val_if_fail (wrapper->priv, 0);
1263 
1264 	g_rec_mutex_lock (&(wrapper->priv->rmutex));
1265 	td = g_hash_table_lookup (wrapper->priv->threads_hash, g_thread_self());
1266 	if (!td) {
1267 		/* nothing to be done for this thread */
1268 		g_rec_mutex_unlock (&(wrapper->priv->rmutex));
1269 		return 0;
1270 	}
1271 
1272 	/* lock job consuming queue to avoid that the worker thread "consume" a Job */
1273 	g_async_queue_lock (wrapper->priv->to_worker_thread);
1274 	for (size = 0, list = td->jobs; list; list = list->next) {
1275 		if (!JOB (list->data)->cancelled)
1276 			size++;
1277 	}
1278 	g_async_queue_unlock (wrapper->priv->to_worker_thread);
1279 	g_rec_mutex_unlock (&(wrapper->priv->rmutex));
1280 	return size;
1281 }
1282 
1283 /*
1284  * Executed in sub thread (or potentially in other threads, in which case will be ignored)
1285  * pushes data into the queue
1286  */
1287 static void
1288 worker_thread_closure_marshal (GClosure *closure,
1289 			       G_GNUC_UNUSED GValue *return_value,
1290 			       guint n_param_values,
1291 			       const GValue *param_values,
1292 			       G_GNUC_UNUSED gpointer invocation_hint,
1293 			       G_GNUC_UNUSED gpointer marshal_data)
1294 {
1295 	SignalSpec *sigspec = (SignalSpec *) closure->data;
1296 
1297 	/* if the signal is not emitted from the working thread then don't do anything */
1298 	if (g_thread_self () !=  sigspec->worker_thread)
1299 		return;
1300 
1301 	/* check that the worker thread is working on a job for which job->reply_queue == sigspec->reply_queue */
1302 	if (sigspec->private &&
1303 	    g_private_get (&worker_thread_current_queue) != sigspec->reply_queue)
1304 		return;
1305 
1306 	gsize i;
1307 	/*
1308 	  for (i = 1; i < n_param_values; i++) {
1309 		g_print ("\t%d => %s\n", i, gda_value_stringify (param_values + i));
1310 	}
1311 	*/
1312 	Job *job= g_new0 (Job, 1);
1313 	job->type = JOB_TYPE_SIGNAL;
1314 	job->u.signal.spec = signal_spec_ref (sigspec);
1315 	job->u.signal.n_param_values = n_param_values - 1;
1316 	job->u.signal.param_values = g_new0 (GValue, job->u.signal.n_param_values);
1317 	for (i = 1; i < n_param_values; i++) {
1318 		const GValue *src;
1319 		GValue *dest;
1320 
1321 		src = param_values + i;
1322 		dest = job->u.signal.param_values + i - 1;
1323 
1324 		g_value_init (dest, G_VALUE_TYPE (src));
1325 		g_value_copy (src, dest);
1326 	}
1327 
1328 	Pipe *jpipe = pipe_ref (sigspec->notif);
1329 	g_async_queue_push (sigspec->reply_queue, job);
1330 	if (! write_notification (NULL, NULL, jpipe, GDA_THREAD_NOTIFICATION_SIGNAL, 0)) {
1331 		Job *je = g_new0 (Job, 1);
1332 		je->type = JOB_TYPE_NOTIFICATION_ERROR;
1333 		g_async_queue_push (sigspec->reply_queue, je);
1334 	}
1335 	signal_spec_unlock (sigspec);
1336 }
1337 
1338 /*
1339  * Executed in sub thread (or potentially in other threads, in which case will be ignored)
1340  * pushes data into the queue
1341  */
1342 static void
1343 worker_thread_closure_marshal_anythread (GClosure *closure,
1344 					 G_GNUC_UNUSED GValue *return_value,
1345 					 guint n_param_values,
1346 					 const GValue *param_values,
1347 					 G_GNUC_UNUSED gpointer invocation_hint,
1348 					 G_GNUC_UNUSED gpointer marshal_data)
1349 {
1350 	SignalSpec *sigspec = (SignalSpec *) closure->data;
1351 
1352 	gsize i;
1353 	/*
1354 	  for (i = 1; i < n_param_values; i++) {
1355 		g_print ("\t%d => %s\n", i, gda_value_stringify (param_values + i));
1356 	}
1357 	*/
1358 	Job *job= g_new0 (Job, 1);
1359 	job->type = JOB_TYPE_SIGNAL;
1360 	job->u.signal.spec = signal_spec_ref (sigspec);
1361 	job->u.signal.n_param_values = n_param_values - 1;
1362 	job->u.signal.param_values = g_new0 (GValue, job->u.signal.n_param_values);
1363 	job->notif = NULL;
1364 	for (i = 1; i < n_param_values; i++) {
1365 		const GValue *src;
1366 		GValue *dest;
1367 
1368 		src = param_values + i;
1369 		dest = job->u.signal.param_values + i - 1;
1370 
1371 		g_value_init (dest, G_VALUE_TYPE (src));
1372 		g_value_copy (src, dest);
1373 	}
1374 
1375 	Pipe *jpipe = pipe_ref (sigspec->notif);
1376 	g_async_queue_push (sigspec->reply_queue, job);
1377 	if (! write_notification (NULL, NULL, jpipe, GDA_THREAD_NOTIFICATION_SIGNAL, 0)) {
1378 		Job *je = g_new0 (Job, 1);
1379 		je->type = JOB_TYPE_NOTIFICATION_ERROR;
1380 		g_async_queue_push (sigspec->reply_queue, je);
1381 	}
1382 	signal_spec_unlock (sigspec);
1383 }
1384 
1385 /**
1386  * gda_thread_wrapper_connect_raw:
1387  * @wrapper: a #GdaThreadWrapper object
1388  * @instance: the instance to connect to
1389  * @sig_name: a string of the form "signal-name::detail"
1390  * @private_thread:  set to %TRUE if @callback is to be invoked only if the signal has
1391  *    been emitted while in @wrapper's private sub thread (ie. used when @wrapper is executing some functions
1392  *    specified by gda_thread_wrapper_execute() or gda_thread_wrapper_execute_void()), and to %FALSE if the
1393  *    callback is to be invoked whenever the signal is emitted, independently of the thread in which the
1394  *    signal is emitted.
1395  * @private_job: set to %TRUE if @callback is to be invoked only if the signal has
1396  *    been emitted when a job created for the calling thread is being executed, and to %FALSE
1397  *    if @callback has to be called whenever the @sig_name signal is emitted by @instance. Note that
1398  *    this argument is not taken into account if @private_thread is set to %FALSE.
1399  * @callback: (scope call): a #GdaThreadWrapperCallback function
1400  * @data: (closure): data to pass to @callback's calls
1401  *
1402  * Connects a callback function to a signal for a particular object. The difference with g_signal_connect() and
1403  * similar functions are:
1404  * <itemizedlist>
1405  *  <listitem><para>the @callback argument is not a #GCallback function, so the callback signature is not
1406  *    dependent on the signal itself</para></listitem>
1407  *  <listitem><para>the signal handler must not have to return any value</para></listitem>
1408  *  <listitem><para>the @callback function will be called asynchronously, the caller may need to use
1409  *    gda_thread_wrapper_iterate() to get the notification</para></listitem>
1410  *  <listitem><para>if @private_job and @private_thread control in which case the signal is propagated.</para></listitem>
1411  * </itemizedlist>
1412  *
1413  * Also note that signal handling is done asynchronously: when emitted in the worker thread, it
1414  * will be "queued" to be processed in the user thread when it has the chance (when gda_thread_wrapper_iterate()
1415  * is called directly or indirectly). The side effect is that the callback function is usually
1416  * called long after the object emitting the signal has finished emitting it.
1417  *
1418  * To disconnect a signal handler, don't use any of the g_signal_handler_*() functions but the
1419  * gda_thread_wrapper_disconnect() method.
1420  *
1421  * Returns: the handler ID
1422  *
1423  * Since: 4.2
1424  */
1425 gulong
1426 gda_thread_wrapper_connect_raw (GdaThreadWrapper *wrapper,
1427 				gpointer instance,
1428 				const gchar *sig_name,
1429 				gboolean private_thread, gboolean private_job,
1430 				GdaThreadWrapperCallback callback,
1431 				gpointer data)
1432 {
1433 	guint sigid;
1434         SignalSpec *sigspec;
1435 	ThreadData *td;
1436 
1437 	g_return_val_if_fail (GDA_IS_THREAD_WRAPPER (wrapper), 0);
1438 	g_return_val_if_fail (wrapper->priv, 0);
1439 
1440 	g_rec_mutex_lock (&(wrapper->priv->rmutex));
1441 
1442 	td = get_thread_data (wrapper, g_thread_self());
1443 
1444         sigid = g_signal_lookup (sig_name, /* FIXME: use g_signal_parse_name () */
1445 				 G_TYPE_FROM_INSTANCE (instance));
1446         if (sigid == 0) {
1447                 g_warning (_("Signal does not exist\n"));
1448                 return 0;
1449         }
1450 
1451         sigspec = g_new0 (SignalSpec, 1);
1452 	sigspec->private = private_job;
1453         g_signal_query (sigid, (GSignalQuery*) sigspec);
1454 
1455 	if (((GSignalQuery*) sigspec)->return_type != G_TYPE_NONE) {
1456 		g_warning (_("Signal to connect to must not have a return value\n"));
1457 		g_free (sigspec);
1458 		return 0;
1459 	}
1460 	sigspec->worker_thread = wrapper->priv->worker_thread;
1461 	sigspec->reply_queue = g_async_queue_ref (td->from_worker_thread);
1462 	sigspec->notif = pipe_ref (td->notif);
1463         sigspec->instance = instance;
1464         sigspec->callback = callback;
1465         sigspec->data = data;
1466 	g_mutex_init (&(sigspec->mutex));
1467 	sigspec->ref_count = 1;
1468 
1469 	GClosure *cl;
1470 	cl = g_closure_new_simple (sizeof (GClosure), sigspec);
1471 	if (private_thread)
1472 		g_closure_set_marshal (cl, (GClosureMarshal) worker_thread_closure_marshal);
1473 	else
1474 		g_closure_set_marshal (cl, (GClosureMarshal) worker_thread_closure_marshal_anythread);
1475 	sigspec->signal_id = g_signal_connect_closure (instance, sig_name, cl, FALSE);
1476 
1477 	td->signals_list = g_slist_append (td->signals_list, sigspec);
1478 
1479 	g_rec_mutex_unlock (&(wrapper->priv->rmutex));
1480 
1481 	return sigspec->signal_id;
1482 }
1483 
1484 static gboolean
1485 find_signal_r_func (G_GNUC_UNUSED GThread *thread, ThreadData *td, gulong *id)
1486 {
1487 	GSList *list;
1488 	for (list = td->signals_list; list; list = list->next) {
1489 		SignalSpec *sigspec;
1490 		sigspec = (SignalSpec*) list->data;
1491 		if (sigspec->signal_id == *id)
1492 			return TRUE;
1493 	}
1494 	return FALSE;
1495 }
1496 
1497 /**
1498  * gda_thread_wrapper_disconnect:
1499  * @wrapper: a #GdaThreadWrapper object
1500  * @id: a handler ID, as returned by gda_thread_wrapper_connect_raw()
1501  *
1502  * Disconnects the emission of a signal, does the opposite of gda_thread_wrapper_connect_raw().
1503  *
1504  * As soon as this method returns, the callback function set when gda_thread_wrapper_connect_raw()
1505  * was called will not be called anymore (even if the object has emitted the signal in the worker
1506  * thread and this signal has not been handled in the user thread).
1507  *
1508  * Since: 4.2
1509  */
1510 void
1511 gda_thread_wrapper_disconnect (GdaThreadWrapper *wrapper, gulong id)
1512 {
1513 	SignalSpec *sigspec = NULL;
1514 	ThreadData *td;
1515 	GSList *list;
1516 
1517 	g_return_if_fail (GDA_IS_THREAD_WRAPPER (wrapper));
1518 	g_return_if_fail (wrapper->priv);
1519 
1520 	g_rec_mutex_lock (&(wrapper->priv->rmutex));
1521 
1522 	td = g_hash_table_lookup (wrapper->priv->threads_hash, g_thread_self());
1523 	if (!td) {
1524 		gulong theid = id;
1525 		td = g_hash_table_find (wrapper->priv->threads_hash,
1526 					(GHRFunc) find_signal_r_func, &theid);
1527 	}
1528 	if (!td) {
1529 		g_warning (_("Signal %lu does not exist"), id);
1530 		g_rec_mutex_unlock (&(wrapper->priv->rmutex));
1531 		return;
1532 	}
1533 
1534 	for (list = td->signals_list; list; list = list->next) {
1535 		if (((SignalSpec*) list->data)->signal_id == id) {
1536 			sigspec = (SignalSpec*) list->data;
1537 			break;
1538 		}
1539 	}
1540 
1541 	if (!sigspec) {
1542 		g_warning (_("Signal %lu does not exist"), id);
1543 		g_rec_mutex_unlock (&(wrapper->priv->rmutex));
1544 		return;
1545 	}
1546 
1547 	signal_spec_lock (sigspec);
1548 #ifdef THREAD_WRAPPER_DEBUG
1549 	g_print ("Disconnecting signal %s for wrapper %p\n", ((GSignalQuery*)sigspec)->signal_name, wrapper);
1550 #endif
1551 	td->signals_list = g_slist_remove (td->signals_list, sigspec);
1552 	g_signal_handler_disconnect (sigspec->instance, sigspec->signal_id);
1553 	sigspec->instance = NULL;
1554 	sigspec->signal_id = 0;
1555 	g_async_queue_unref (sigspec->reply_queue);
1556 	sigspec->reply_queue = NULL;
1557 	sigspec->callback = NULL;
1558 	sigspec->data = NULL;
1559 	signal_spec_unref (sigspec);
1560 
1561 	if (!td->results &&
1562 	    !td->jobs &&
1563 	    (g_async_queue_length (td->from_worker_thread) == 0) &&
1564 	    !td->signals_list) {
1565 		/* remove this ThreadData */
1566 		g_hash_table_remove (wrapper->priv->threads_hash, g_thread_self());
1567 	}
1568 	g_rec_mutex_unlock (&(wrapper->priv->rmutex));
1569 }
1570 
1571 /**
1572  * gda_thread_wrapper_steal_signal:
1573  * @wrapper: a #GdaThreadWrapper object
1574  * @id: a signal ID
1575  *
1576  * Requests that the signal which ID is @id (which has been obtained using gda_thread_wrapper_connect_raw())
1577  * be treated by the calling thread instead of by the thread in which gda_thread_wrapper_connect_raw()
1578  * was called.
1579  *
1580  * Since: 4.2
1581  */
1582 
1583 void
1584 gda_thread_wrapper_steal_signal (GdaThreadWrapper *wrapper, gulong id)
1585 {
1586 	ThreadData *old_td, *new_td = NULL;
1587         g_return_if_fail (GDA_IS_THREAD_WRAPPER (wrapper));
1588         g_return_if_fail (wrapper->priv);
1589 	g_return_if_fail (id > 0);
1590 
1591 	g_rec_mutex_lock (&(wrapper->priv->rmutex));
1592 
1593 	gulong theid = id;
1594 	old_td = g_hash_table_find (wrapper->priv->threads_hash,
1595 				    (GHRFunc) find_signal_r_func, &theid);
1596 	if (!old_td) {
1597 		g_warning (_("Signal %lu does not exist"), id);
1598 		g_rec_mutex_unlock (&(wrapper->priv->rmutex));
1599 		return;
1600 	}
1601 
1602 	if (old_td->owner == g_thread_self ()) {
1603 		g_rec_mutex_unlock (&(wrapper->priv->rmutex));
1604 		return;
1605 	}
1606 
1607         /* merge old_td and new_td */
1608         if (old_td->signals_list) {
1609                 GSList *list;
1610                 for (list = old_td->signals_list; list; list = list->next) {
1611                         SignalSpec *sigspec = (SignalSpec*) list->data;
1612 			if (sigspec->signal_id == id) {
1613 				new_td = get_thread_data (wrapper, g_thread_self ());
1614 				new_td->signals_list = g_slist_prepend (new_td->signals_list, sigspec);
1615 				old_td->signals_list = g_slist_remove (old_td->signals_list, sigspec);
1616 				g_async_queue_unref (sigspec->reply_queue);
1617 				sigspec->reply_queue = g_async_queue_ref (new_td->from_worker_thread);
1618 				break;
1619 			}
1620                 }
1621         }
1622 
1623 	g_rec_mutex_unlock (&(wrapper->priv->rmutex));
1624 }
1625