1 /*
2 * Copyright (C) 2009 Bas Driessen <bas.driessen@xobas.com>
3 * Copyright (C) 2009 - 2013 Vivien Malerba <malerba@gnome-db.org>
4 * Copyright (C) 2010 David King <davidk@openismus.com>
5 * Copyright (C) 2010 Jonh Wendell <jwendell@gnome.org>
6 *
7 * This library is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU Lesser General Public
9 * License as published by the Free Software Foundation; either
10 * version 2 of the License, or (at your option) any later version.
11 *
12 * This library is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this library; if not, write to the
19 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20 * Boston, MA 02110-1301, USA.
21 */
22
23 //#define DEBUG_NOTIFICATION
24
25 #include <string.h>
26 #include <glib/gi18n-lib.h>
27 #include "gda-thread-wrapper.h"
28 #include <libgda/gda-mutex.h>
29 #include <gobject/gvaluecollector.h>
30 #include <libgda/gda-debug-macros.h>
31 #include <libgda/gda-value.h>
32 #include <unistd.h>
33 #include <sys/stat.h>
34 #ifdef G_OS_WIN32
35 #include <fcntl.h>
36 #include <io.h>
37 #endif
38
39 /* this GPrivate holds a pointer to the GAsyncQueue used by the job being currently treated
40 * by the worker thread. It is used to avoid creating signal data for threads for which
41 * no job is being performed
42 */
43 GPrivate worker_thread_current_queue;
44
45 typedef struct _ThreadData ThreadData;
46 typedef struct _Job Job;
47 typedef struct _SignalSpec SignalSpec;
48 typedef struct _Pipe Pipe;
49
50 struct _GdaThreadWrapperPrivate {
51 GRecMutex rmutex;
52 guint next_job_id;
53 GThread *worker_thread;
54 GAsyncQueue *to_worker_thread;
55
56 GHashTable *threads_hash; /* key = a GThread, value = a #ThreadData pointer */
57 GHashTable *pipes_hash; /* key = a GThread, value = a #Pipe pointer */
58 };
59
60 /*
61 * Threads synchronization with notifications
62 *
63 * Both Unix and Windows create a set of 2 file descriptors, the one at potision 0 for reading
64 * and the one at position 1 for writing.
65 */
66 struct _Pipe {
67 GThread *thread;
68 int fds[2]; /* [0] for reading and [1] for writing */
69 GIOChannel *ioc;
70
71 GMutex mutex; /* locks @ref_count */
72 guint ref_count;
73 };
74
75 #define pipe_lock(x) g_mutex_lock(& (((Pipe*)x)->mutex))
76 #define pipe_unlock(x) g_mutex_unlock(& (((Pipe*)x)->mutex))
77
78 static Pipe *
pipe_ref(Pipe * p)79 pipe_ref (Pipe *p)
80 {
81 if (p) {
82 pipe_lock (p);
83 p->ref_count++;
84 #ifdef DEBUG_NOTIFICATION
85 g_print ("Pipe %p ++: %u\n", p, p->ref_count);
86 #endif
87 pipe_unlock (p);
88 }
89 return p;
90 }
91
92 static void
pipe_unref(Pipe * p)93 pipe_unref (Pipe *p)
94 {
95 if (p) {
96 pipe_lock (p);
97 p->ref_count--;
98 #ifdef DEBUG_NOTIFICATION
99 g_print ("Pipe %p --: %u\n", p, p->ref_count);
100 #endif
101 if (p->ref_count == 0) {
102 /* destroy @p */
103 GMutex *m = &(p->mutex);
104
105 if (p->ioc)
106 g_io_channel_unref (p->ioc);
107 #ifdef G_OS_WIN32
108 if (p->fds[0] >= 0)
109 _close (p->fds[0]);
110 if (p->fds[1] >= 0)
111 _close (p->fds[1]);
112 #else
113 if (p->fds[0] >= 0)
114 close (p->fds[0]);
115 if (p->fds[1] >= 0)
116 close (p->fds[1]);
117 #endif
118
119 #ifdef DEBUG_NOTIFICATION
120 g_print ("Destroyed Pipe %p\n", p);
121 #endif
122 g_free (p);
123
124 g_mutex_unlock (m);
125 g_mutex_clear (m);
126 }
127 else
128 pipe_unlock (p);
129 }
130 }
131
132 /*
133 * May return %NULL
134 */
135 static Pipe *
pipe_new(void)136 pipe_new (void)
137 {
138 Pipe *p;
139
140 p = g_new0 (Pipe, 1);
141 g_mutex_init (&(p->mutex));
142 p->ref_count = 1;
143 p->thread = g_thread_self ();
144 #ifdef G_OS_WIN32
145 if (_pipe (p->fds, 156, O_BINARY) != 0) {
146 #else
147 if (pipe (p->fds) != 0) {
148 #endif
149 pipe_unref (p);
150 p = NULL;
151 goto out;
152 }
153 #ifdef G_OS_WIN32
154 p->ioc = g_io_channel_win32_new_fd (p->fds [0]);
155 #else
156 p->ioc = g_io_channel_unix_new (p->fds [0]);
157 #endif
158
159 /* we want raw data */
160 if (g_io_channel_set_encoding (p->ioc, NULL, NULL) != G_IO_STATUS_NORMAL) {
161 g_warning ("Can't set IO encoding to NULL\n");
162 pipe_unref (p);
163 p = NULL;
164 }
165
166 out:
167 #ifdef DEBUG_NOTIFICATION
168 g_print ("Created Pipe %p\n", p);
169 #endif
170 return p;
171 }
172
173 static Pipe *
174 get_pipe (GdaThreadWrapper *wrapper, GThread *thread)
175 {
176 Pipe *p = NULL;
177 g_rec_mutex_lock (&(wrapper->priv->rmutex));
178 if (wrapper->priv->pipes_hash)
179 p = g_hash_table_lookup (wrapper->priv->pipes_hash, thread);
180 g_rec_mutex_unlock (&(wrapper->priv->rmutex));
181 return p;
182 }
183
184 /*
185 * One instance for each job to execute (and its result) and
186 * one instance for each emitted signal
187 *
188 * Created and destroyed exclusively by the thread(s) using the GdaThreadWrapper object,
189 * except for the job where job->type == JOB_TYPE_DESTROY which is destroyed by the sub thread.
190 *
191 * Passed to the sub job through obj->to_worker_thread
192 */
193 typedef enum {
194 JOB_TYPE_EXECUTE, /* a "real" job for the GdaThreadWrapper */
195 JOB_TYPE_DESTROY, /* internal to signal the internal thread to shutdown */
196 JOB_TYPE_SIGNAL, /* a signal from an object in the internal thread */
197 JOB_TYPE_NOTIFICATION_ERROR /* internal to signal notification error and sutdown */
198 } JobType;
199 struct _Job {
200 JobType type;
201 gboolean processed; /* TRUE when worker thread has started to work on it */
202 gboolean cancelled; /* TRUE when job has been cancelled before being executed */
203 guint job_id;
204 GdaThreadWrapperFunc func;
205 GdaThreadWrapperVoidFunc void_func;
206 gpointer arg;
207 GDestroyNotify arg_destroy_func;
208 GAsyncQueue *reply_queue; /* holds a ref to it */
209 Pipe *notif; /* if not %NULL, notification when job finished */
210
211 /* result part */
212 union {
213 struct {
214 gpointer result;
215 GError *error;
216 } exe;
217 struct {
218 SignalSpec *spec;
219 guint n_param_values;
220 GValue *param_values; /* array of GValue structures */
221 } signal;
222 } u;
223 };
224 #define JOB(x) ((Job*)(x))
225 static void
226 job_free (Job *job)
227 {
228 pipe_unref (job->notif);
229 if (job->arg && job->arg_destroy_func)
230 job->arg_destroy_func (job->arg);
231 if (job->reply_queue)
232 g_async_queue_unref (job->reply_queue);
233
234 if (job->type == JOB_TYPE_EXECUTE) {
235 if (job->u.exe.error)
236 g_error_free (job->u.exe.error);
237 }
238 else if (job->type == JOB_TYPE_SIGNAL) {
239 guint i;
240 for (i = 0; i < job->u.signal.n_param_values; i++) {
241 GValue *value = job->u.signal.param_values + i;
242 if (G_VALUE_TYPE (value) != GDA_TYPE_NULL)
243 g_value_reset (value);
244 }
245 g_free (job->u.signal.param_values);
246 }
247 else if (job->type == JOB_TYPE_DESTROY) {
248 /* nothing to do here */
249 }
250 else if (job->type == JOB_TYPE_NOTIFICATION_ERROR) {
251 /* nothing to do here */
252 }
253 else
254 g_assert_not_reached ();
255 g_free (job);
256 }
257
258 /*
259 * Signal specification, created when using _connect().
260 *
261 * A SignalSpec only exists as long as the correcponding ThreadData exists.
262 */
263 struct _SignalSpec {
264 GSignalQuery sigprop; /* must be first */
265
266 gboolean private;
267 GThread *worker_thread;
268 GAsyncQueue *reply_queue; /* a ref is held here */
269 Pipe *notif; /* if not %NULL, notification */
270
271 gpointer instance;
272 gulong signal_id;
273
274 GdaThreadWrapperCallback callback;
275 gpointer data;
276
277 GMutex mutex;
278 guint ref_count;
279 };
280
281 #define signal_spec_lock(x) g_mutex_lock(& (((SignalSpec*)x)->mutex))
282 #define signal_spec_unlock(x) g_mutex_unlock(& (((SignalSpec*)x)->mutex))
283
284 /*
285 * call signal_spec_lock() before calling this function
286 */
287 static void
288 signal_spec_unref (SignalSpec *sigspec)
289 {
290 sigspec->ref_count --;
291 if (sigspec->ref_count == 0) {
292 signal_spec_unlock (sigspec);
293 g_mutex_clear (&(sigspec->mutex));
294 if (sigspec->instance && (sigspec->signal_id > 0))
295 g_signal_handler_disconnect (sigspec->instance, sigspec->signal_id);
296 if (sigspec->reply_queue)
297 g_async_queue_unref (sigspec->reply_queue);
298 pipe_unref (sigspec->notif);
299 g_free (sigspec);
300 }
301 else
302 signal_spec_unlock (sigspec);
303 }
304
305 /*
306 * call signal_spec_unlock() after this function
307 */
308 static SignalSpec *
309 signal_spec_ref (SignalSpec *sigspec)
310 {
311 signal_spec_lock (sigspec);
312 sigspec->ref_count ++;
313 return sigspec;
314 }
315
316 /*
317 * Per thread accounting data.
318 * Each new job increases the ref count
319 */
320 struct _ThreadData {
321 GThread *owner;
322 GSList *signals_list; /* list of SignalSpec pointers, owns all the structures */
323 GAsyncQueue *from_worker_thread; /* holds a ref to it */
324
325 GSList *jobs; /* list of Job pointers not yet handled, or being handled (ie not yet poped from @from_worker_thread) */
326 GSList *results; /* list of Job pointers to completed jobs (ie. poped from @from_worker_thread) */
327
328 Pipe *notif; /* if not %NULL, notification when any job has finished */
329 };
330 #define THREAD_DATA(x) ((ThreadData*)(x))
331
332 static ThreadData *
333 get_thread_data (GdaThreadWrapper *wrapper, GThread *thread)
334 {
335 ThreadData *td;
336 g_rec_mutex_lock (&(wrapper->priv->rmutex));
337 td = g_hash_table_lookup (wrapper->priv->threads_hash, thread);
338 if (!td) {
339 Pipe *p;
340 p = get_pipe (wrapper, thread);
341
342 td = g_new0 (ThreadData, 1);
343 td->owner = thread;
344 td->from_worker_thread = g_async_queue_new_full ((GDestroyNotify) job_free);
345 td->jobs = NULL;
346 td->results = NULL;
347 td->notif = pipe_ref (p);
348 g_hash_table_insert (wrapper->priv->threads_hash, thread, td);
349 }
350 g_rec_mutex_unlock (&(wrapper->priv->rmutex));
351 return td;
352 }
353
354 static void
355 thread_data_free (ThreadData *td)
356 {
357 pipe_unref (td->notif);
358 g_async_queue_unref (td->from_worker_thread);
359 td->from_worker_thread = NULL;
360 g_assert (!td->jobs);
361 if (td->results) {
362 g_slist_foreach (td->results, (GFunc) job_free, NULL);
363 g_slist_free (td->results);
364 td->results = NULL;
365 }
366 if (td->signals_list) {
367 GSList *list;
368 for (list = td->signals_list; list; list = list->next) {
369 /* clear SignalSpec */
370 SignalSpec *sigspec = (SignalSpec*) list->data;
371
372 signal_spec_lock (sigspec);
373 g_signal_handler_disconnect (sigspec->instance, sigspec->signal_id);
374 sigspec->instance = NULL;
375 sigspec->signal_id = 0;
376 g_async_queue_unref (sigspec->reply_queue);
377 sigspec->reply_queue = NULL;
378 sigspec->callback = NULL;
379 sigspec->data = NULL;
380 signal_spec_unref (sigspec);
381 }
382 g_slist_free (td->signals_list);
383 }
384 g_free (td);
385 }
386
387 static void gda_thread_wrapper_class_init (GdaThreadWrapperClass *klass);
388 static void gda_thread_wrapper_init (GdaThreadWrapper *wrapper, GdaThreadWrapperClass *klass);
389 static void gda_thread_wrapper_dispose (GObject *object);
390 static void gda_thread_wrapper_set_property (GObject *object,
391 guint param_id,
392 const GValue *value,
393 GParamSpec *pspec);
394 static void gda_thread_wrapper_get_property (GObject *object,
395 guint param_id,
396 GValue *value,
397 GParamSpec *pspec);
398
399 /* properties */
400 enum {
401 PROP_0
402 };
403
404 static GObjectClass *parent_class = NULL;
405
406 /*
407 * GdaThreadWrapper class implementation
408 * @klass:
409 */
410 static void
411 gda_thread_wrapper_class_init (GdaThreadWrapperClass *klass)
412 {
413 GObjectClass *object_class = G_OBJECT_CLASS (klass);
414
415 parent_class = g_type_class_peek_parent (klass);
416
417 /* Properties */
418 object_class->set_property = gda_thread_wrapper_set_property;
419 object_class->get_property = gda_thread_wrapper_get_property;
420
421 object_class->dispose = gda_thread_wrapper_dispose;
422 }
423
424 static void
425 clean_notifications (GdaThreadWrapper *wrapper, ThreadData *td)
426 {
427 #ifdef DEBUG_NOTIFICATION
428 g_print ("%s(Pipe:%p)\n", __FUNCTION__, td->notif);
429 #endif
430 GSList *list;
431 for (list = td->signals_list; list; list = list->next) {
432 SignalSpec *sigspec;
433 sigspec = (SignalSpec*) list->data;
434 signal_spec_lock (sigspec);
435 if (sigspec->notif == td->notif) {
436 pipe_unref (sigspec->notif);
437 sigspec->notif = NULL;
438 }
439 signal_spec_unlock (sigspec);
440 }
441
442 pipe_unref (td->notif);
443 td->notif = NULL;
444
445 g_hash_table_remove (wrapper->priv->pipes_hash, td->owner);
446 }
447
448 /*
449 * @wrapper: (allow-none): may be %NULL
450 * @td: (allow-none): may be %NULL
451 *
452 * Either @wrapper and @td are both NULL, or they are both NOT NULL
453 *
454 * It is assumed that pipe_ref(p) has been called before calling this function
455 */
456 static gboolean
457 write_notification (GdaThreadWrapper *wrapper, ThreadData *td,
458 Pipe *p, GdaThreadNotificationType type, guint job_id)
459 {
460 g_assert ((wrapper && td) || (!wrapper && !td));
461
462 if (!p || (p->fds[1] < 0)) {
463 pipe_unref (p);
464 return TRUE;
465 }
466 #ifdef DEBUG_NOTIFICATION_FORCE
467 static guint c = 0;
468 c++;
469 if (c == 4)
470 goto onerror;
471 #endif
472
473 GdaThreadNotification notif;
474 ssize_t nw;
475 notif.type = type;
476 notif.job_id = job_id;
477 #ifdef G_OS_WIN32
478 nw = _write (p->fds[1], ¬if, sizeof (notif));
479 #else
480 nw = write (p->fds[1], ¬if, sizeof (notif));
481 #endif
482 if (nw != sizeof (notif)) {
483 /* Error */
484 goto onerror;
485 }
486 #ifdef DEBUG_NOTIFICATION
487 g_print ("Wrote notification %d.%u to pipe %p\n", type, job_id, p);
488 #endif
489 pipe_unref (p);
490 return TRUE;
491
492 onerror:
493 #ifdef DEBUG_NOTIFICATION
494 g_print ("%s(): returned FALSE\n", __FUNCTION__);
495 g_print ("Closed FD %d\n", p->fds [1]);
496 #endif
497 /* close the writing end of the pipe */
498 #ifdef G_OS_WIN32
499 _close (p->fds [1]);
500 #else
501 close (p->fds [1]);
502 #endif
503 p->fds [1] = -1;
504 pipe_unref (p);
505 if (td)
506 clean_notifications (wrapper, td);
507 return FALSE;
508 }
509
510 /*
511 * Executed in the sub thread:
512 * takes a Job in (from the wrapper->priv->to_worker_thread queue) and creates a new Result which
513 * it pushed to Job->reply_queue
514 */
515 static gpointer
516 worker_thread_entry_point (GAsyncQueue *to_worker_thread)
517 {
518 GAsyncQueue *in;
519
520 in = to_worker_thread;
521
522 while (1) {
523 Job *job;
524
525 /* pop next job and mark it as being processed */
526 g_private_set (&worker_thread_current_queue, NULL);
527 g_async_queue_lock (in);
528 job = g_async_queue_pop_unlocked (in);
529 job->processed = TRUE;
530 g_private_set (&worker_thread_current_queue, job->reply_queue);
531 g_async_queue_unlock (in);
532
533 if (job->cancelled) {
534 job_free (job);
535 continue;
536 }
537
538 if (job->type == JOB_TYPE_DESTROY) {
539 g_assert (! job->arg_destroy_func);
540 job_free (job);
541 #ifdef THREAD_WRAPPER_DEBUG
542 g_print ("... exit sub thread %p for wrapper\n", g_thread_self ());
543 #endif
544
545 /* exit sub thread */
546 break;
547 }
548 else if (job->type == JOB_TYPE_EXECUTE) {
549 if (job->func)
550 job->u.exe.result = job->func (job->arg, &(job->u.exe.error));
551 else {
552 job->u.exe.result = NULL;
553 job->void_func (job->arg, &(job->u.exe.error));
554 }
555
556 guint jid = job->job_id;
557 Pipe *jpipe = pipe_ref (job->notif);
558 g_async_queue_push (job->reply_queue, job);
559 if (! write_notification (NULL, NULL, jpipe, GDA_THREAD_NOTIFICATION_JOB, jid)) {
560 Job *je = g_new0 (Job, 1);
561 je->type = JOB_TYPE_NOTIFICATION_ERROR;
562 g_async_queue_push (job->reply_queue, je);
563 }
564 }
565 else
566 g_assert_not_reached ();
567 }
568
569 g_async_queue_unref (in);
570
571 return NULL;
572 }
573
574 static void
575 gda_thread_wrapper_init (GdaThreadWrapper *wrapper, G_GNUC_UNUSED GdaThreadWrapperClass *klass)
576 {
577 g_return_if_fail (GDA_IS_THREAD_WRAPPER (wrapper));
578
579 wrapper->priv = g_new0 (GdaThreadWrapperPrivate, 1);
580 g_rec_mutex_init (&(wrapper->priv->rmutex));
581 wrapper->priv->next_job_id = 1;
582
583 wrapper->priv->threads_hash = g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify) thread_data_free);
584
585 wrapper->priv->to_worker_thread = g_async_queue_new ();
586 wrapper->priv->worker_thread = g_thread_new ("worker",
587 (GThreadFunc) worker_thread_entry_point,
588 g_async_queue_ref (wrapper->priv->to_worker_thread)); /* inc. ref for sub thread usage */
589
590 wrapper->priv->pipes_hash = NULL;
591
592 #ifdef THREAD_WRAPPER_DEBUG
593 g_print ("... new wrapper %p, worker_thread=%p\n", wrapper, wrapper->priv->worker_thread);
594 #endif
595 }
596
597 static gboolean
598 thread_data_remove_jobs_func (G_GNUC_UNUSED GThread *key, ThreadData *td, G_GNUC_UNUSED gpointer data)
599 {
600 if (td->jobs) {
601 GSList *list;
602 for (list = td->jobs; list; list = list->next) {
603 Job *job = JOB (list->data);
604 if (job->processed) {
605 /* we can't free that job because it is probably being used by the
606 * worker thread, so just emit a warning
607 */
608 if (job->arg_destroy_func) {
609 g_warning ("The argument of Job ID %d will be destroyed by sub thread",
610 job->job_id);
611 }
612 }
613 else {
614 /* cancel this job */
615 job->cancelled = TRUE;
616 if (job->arg && job->arg_destroy_func) {
617 job->arg_destroy_func (job->arg);
618 job->arg = NULL;
619 }
620 }
621 }
622 g_slist_free (td->jobs);
623 td->jobs = NULL;
624 }
625 return TRUE; /* remove this ThreadData */
626 }
627
628 static void
629 gda_thread_wrapper_dispose (GObject *object)
630 {
631 GdaThreadWrapper *wrapper = (GdaThreadWrapper *) object;
632
633 g_return_if_fail (GDA_IS_THREAD_WRAPPER (wrapper));
634
635 if (wrapper->priv) {
636 Job *job = g_new0 (Job, 1);
637 job->type = JOB_TYPE_DESTROY;
638 job->notif = NULL;
639 g_async_queue_push (wrapper->priv->to_worker_thread, job);
640 #ifdef THREAD_WRAPPER_DEBUG
641 g_print ("... pushed JOB_TYPE_DESTROY for wrapper %p\n", wrapper);
642 #endif
643
644 g_async_queue_lock (wrapper->priv->to_worker_thread);
645 if (wrapper->priv->threads_hash) {
646 g_hash_table_foreach_remove (wrapper->priv->threads_hash,
647 (GHRFunc) thread_data_remove_jobs_func, NULL);
648 g_hash_table_destroy (wrapper->priv->threads_hash);
649 }
650 g_async_queue_unlock (wrapper->priv->to_worker_thread);
651 g_async_queue_unref (wrapper->priv->to_worker_thread);
652 wrapper->priv->worker_thread = NULL; /* side note: don't wait for sub thread to terminate */
653
654 g_rec_mutex_clear (&(wrapper->priv->rmutex));
655
656 if (wrapper->priv->pipes_hash)
657 g_hash_table_destroy (wrapper->priv->pipes_hash);
658
659 g_free (wrapper->priv);
660 wrapper->priv = NULL;
661 }
662
663 /* chain to parent class */
664 parent_class->dispose (object);
665 }
666
667 /* module error */
668 GQuark gda_thread_wrapper_error_quark (void)
669 {
670 static GQuark quark;
671 if (!quark)
672 quark = g_quark_from_static_string ("gda_thread_wrapper_error");
673 return quark;
674 }
675
676 /**
677 * gda_thread_wrapper_get_type:
678 *
679 * Registers the #GdaThreadWrapper class on the GLib type system.
680 *
681 * Returns: the GType identifying the class.
682 */
683 GType
684 gda_thread_wrapper_get_type (void)
685 {
686 static GType type = 0;
687
688 if (G_UNLIKELY (type == 0)) {
689 static GMutex registering;
690 static const GTypeInfo info = {
691 sizeof (GdaThreadWrapperClass),
692 (GBaseInitFunc) NULL,
693 (GBaseFinalizeFunc) NULL,
694 (GClassInitFunc) gda_thread_wrapper_class_init,
695 NULL,
696 NULL,
697 sizeof (GdaThreadWrapper),
698 0,
699 (GInstanceInitFunc) gda_thread_wrapper_init,
700 0
701 };
702
703 g_mutex_lock (®istering);
704 if (type == 0)
705 type = g_type_register_static (G_TYPE_OBJECT, "GdaThreadWrapper", &info, 0);
706 g_mutex_unlock (®istering);
707 }
708 return type;
709 }
710
711 static void
712 gda_thread_wrapper_set_property (GObject *object,
713 guint param_id,
714 G_GNUC_UNUSED const GValue *value,
715 GParamSpec *pspec)
716 {
717 GdaThreadWrapper *wrapper;
718
719 wrapper = GDA_THREAD_WRAPPER (object);
720 if (wrapper->priv) {
721 switch (param_id) {
722 default:
723 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, param_id, pspec);
724 break;
725 }
726 }
727 }
728
729 static void
730 gda_thread_wrapper_get_property (GObject *object,
731 guint param_id,
732 G_GNUC_UNUSED GValue *value,
733 GParamSpec *pspec)
734 {
735 GdaThreadWrapper *wrapper;
736
737 wrapper = GDA_THREAD_WRAPPER (object);
738 if (wrapper->priv) {
739 switch (param_id) {
740 default:
741 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, param_id, pspec);
742 break;
743 }
744 }
745 }
746
747 /**
748 * gda_thread_wrapper_new:
749 *
750 * Creates a new #GdaThreadWrapper object
751 *
752 * Returns: a new #GdaThreadWrapper object, or %NULL if threads are not supported/enabled
753 *
754 * Since: 4.2
755 */
756 GdaThreadWrapper *
757 gda_thread_wrapper_new (void)
758 {
759 return (GdaThreadWrapper *) g_object_new (GDA_TYPE_THREAD_WRAPPER, NULL);
760 }
761
762 /**
763 * gda_thread_wrapper_get_io_channel:
764 * @wrapper: a #GdaThreadWrapper object
765 *
766 * Allow @wrapper to notify when an execution job is finished, by making its exec ID
767 * readable through a new #GIOChannel. This function is useful when the notification needs
768 * to be included into a main loop. This also notifies that signals (emitted by objects in
769 * @wrapper's internal thread) are available.
770 *
771 * The returned #GIOChannel will have something to read everytime an execution job is finished
772 * for an execution job submitted from the calling thread. The user whould read #GdaThreadNotification
773 * structures from the channel and analyse its contents to call gda_thread_wrapper_iterate()
774 * or gda_thread_wrapper_fetch_result().
775 *
776 * Note1: the new communication channel will only be operational for jobs submitted after this
777 * function returns, and for signals which have been connected after this function returns. A safe
778 * practice is to call this function before the @wrapper object has been used.
779 *
780 * Note2: this function will return the same #GIOChannel everytime it's called from the same thread.
781 *
782 * Note3: if the usage of the returned #GIOChannel reveals an error, then g_io_channel_shutdown() and
783 * g_io_channel_unref() should be called on the #GIOChannel to let @wrapper know it should not use
784 * that object anymore.
785 *
786 * Returns: (transfer none): a new #GIOChannel, or %NULL if it could not be created
787 *
788 * Since: 4.2.9
789 */
790 GIOChannel *
791 gda_thread_wrapper_get_io_channel (GdaThreadWrapper *wrapper)
792 {
793 Pipe *p;
794 GThread *th;
795 g_return_val_if_fail (GDA_IS_THREAD_WRAPPER (wrapper), NULL);
796 g_return_val_if_fail (wrapper->priv, NULL);
797
798 th = g_thread_self ();
799 g_rec_mutex_lock (&(wrapper->priv->rmutex));
800 p = get_pipe (wrapper, th);
801 if (!p) {
802 p = pipe_new ();
803 if (p) {
804 if (! wrapper->priv->pipes_hash)
805 wrapper->priv->pipes_hash = g_hash_table_new_full (g_direct_hash,
806 g_direct_equal,
807 NULL,
808 (GDestroyNotify) pipe_unref);
809 g_hash_table_insert (wrapper->priv->pipes_hash, th, p);
810 }
811 }
812 g_rec_mutex_unlock (&(wrapper->priv->rmutex));
813 if (p)
814 return p->ioc;
815 else
816 return NULL;
817 }
818
819 /**
820 * gda_thread_wrapper_unset_io_channel:
821 * @wrapper: a #GdaThreadWrapper
822 *
823 * Does the opposite of gda_thread_wrapper_get_io_channel()
824 *
825 * Since: 4.2.9
826 */
827 void
828 gda_thread_wrapper_unset_io_channel (GdaThreadWrapper *wrapper)
829 {
830 GThread *th;
831 ThreadData *td;
832
833 g_return_if_fail (GDA_IS_THREAD_WRAPPER (wrapper));
834 g_return_if_fail (wrapper->priv);
835
836 g_rec_mutex_lock (&(wrapper->priv->rmutex));
837 th = g_thread_self ();
838 td = g_hash_table_lookup (wrapper->priv->threads_hash, th);
839 if (td) {
840 Pipe *p;
841 p = get_pipe (wrapper, th);
842 if (p)
843 clean_notifications (wrapper, td);
844 }
845 g_rec_mutex_unlock (&(wrapper->priv->rmutex));
846 }
847
848 /**
849 * gda_thread_wrapper_execute:
850 * @wrapper: a #GdaThreadWrapper object
851 * @func: the function to execute, not %NULL
852 * @arg: (allow-none): argument to pass to @func, or %NULL
853 * @arg_destroy_func: (allow-none): function to be called when the execution has finished, to destroy @arg, or %NULL
854 * @error: a place to store errors, for errors occurring in this method, not errors occurring while @func
855 * is executed, or %NULL
856 *
857 * Make @wrapper execute the @func function with the @arg argument (along with a #GError which is not @error)
858 * in the sub thread managed by @wrapper. To execute a function which does not return anything,
859 * use gda_thread_wrapper_execute_void().
860 *
861 * This method returns immediately, and the caller then needs to use gda_thread_wrapper_fetch_result() to
862 * check if the execution has finished and get the result.
863 *
864 * Once @func's execution is finished, if @arg is not %NULL, the @arg_destroy_func destruction function is called
865 * on @arg. This call occurs in the thread calling gda_thread_wrapper_fetch_result().
866 *
867 * If an error occurred in this function, then the @arg_destroy_func function is not called to free @arg.
868 *
869 * Returns: the job ID, or 0 if an error occurred
870 *
871 * Since: 4.2
872 */
873 guint
874 gda_thread_wrapper_execute (GdaThreadWrapper *wrapper, GdaThreadWrapperFunc func,
875 gpointer arg, GDestroyNotify arg_destroy_func, G_GNUC_UNUSED GError **error)
876 {
877 Job *job;
878 guint id;
879 ThreadData *td;
880
881 g_return_val_if_fail (GDA_IS_THREAD_WRAPPER (wrapper), 0);
882 g_return_val_if_fail (wrapper->priv, 0);
883 g_return_val_if_fail (func, 0);
884
885 td = get_thread_data (wrapper, g_thread_self());
886
887 job = g_new0 (Job, 1);
888 job->type = JOB_TYPE_EXECUTE;
889 job->processed = FALSE;
890 job->cancelled = FALSE;
891 g_rec_mutex_lock (&(wrapper->priv->rmutex));
892 job->job_id = wrapper->priv->next_job_id++;
893 g_rec_mutex_unlock (&(wrapper->priv->rmutex));
894 job->func = func;
895 job->void_func = NULL;
896 job->arg = arg;
897 job->arg_destroy_func = arg_destroy_func;
898 job->reply_queue = g_async_queue_ref (td->from_worker_thread);
899 job->notif = pipe_ref (td->notif);
900
901 id = job->job_id;
902 #ifdef THREAD_WRAPPER_DEBUG
903 g_print ("... submitted job %d for wrapper %p from thread %p\n", id, wrapper, g_thread_self());
904 #endif
905
906 td->jobs = g_slist_append (td->jobs, job);
907
908 if (g_thread_self () == wrapper->priv->worker_thread) {
909 job->processed = TRUE;
910 if (job->func)
911 job->u.exe.result = job->func (job->arg, &(job->u.exe.error));
912 else {
913 job->u.exe.result = NULL;
914 job->void_func (job->arg, &(job->u.exe.error));
915 }
916 #ifdef THREAD_WRAPPER_DEBUG
917 g_print ("... IMMEDIATELY done job %d => %p\n", job->job_id, job->u.exe.result);
918 #endif
919 guint jid = job->job_id;
920 Pipe *jpipe = pipe_ref (job->notif);
921 g_async_queue_push (job->reply_queue, job);
922 write_notification (wrapper, td, jpipe, GDA_THREAD_NOTIFICATION_JOB, jid);
923 }
924 else
925 g_async_queue_push (wrapper->priv->to_worker_thread, job);
926
927 return id;
928 }
929
930 /**
931 * gda_thread_wrapper_execute_void:
932 * @wrapper: a #GdaThreadWrapper object
933 * @func: the function to execute, not %NULL
934 * @arg: (allow-none): argument to pass to @func
935 * @arg_destroy_func: (allow-none): function to be called when the execution has finished, to destroy @arg, or %NULL
936 * @error: a place to store errors, for errors occurring in this method, not errors occurring while @func
937 * is executed, or %NULL
938 *
939 * Make @wrapper execute the @func function with the @arg argument (along with a #GError which is not @error)
940 * in the sub thread managed by @wrapper. To execute a function which returns some pointer,
941 * use gda_thread_wrapper_execute().
942 *
943 * This method returns immediately. Calling gda_thread_wrapper_fetch_result() is not necessary as @func
944 * does not return any result. However, it may be necessary to call gda_thread_wrapper_iterate() to give @wrapper a
945 * chance to execute the @arg_destroy_func function if not %NULL (note that gda_thread_wrapper_iterate() is
946 * called by gda_thread_wrapper_fetch_result() itself).
947 *
948 * Once @func's execution is finished, if @arg is not %NULL, the @arg_destroy_func destruction function is called
949 * on @arg. This call occurs in the thread calling gda_thread_wrapper_fetch_result().
950 *
951 * If an error occurred in this function, then the @arg_destroy_func function is not called to free @arg.
952 *
953 * Returns: the job ID, or 0 if an error occurred
954 *
955 * Since: 4.2
956 */
957 guint
958 gda_thread_wrapper_execute_void (GdaThreadWrapper *wrapper, GdaThreadWrapperVoidFunc func,
959 gpointer arg, GDestroyNotify arg_destroy_func, G_GNUC_UNUSED GError **error)
960 {
961 Job *job;
962 guint id;
963 ThreadData *td;
964
965 g_return_val_if_fail (GDA_IS_THREAD_WRAPPER (wrapper), 0);
966 g_return_val_if_fail (wrapper->priv, 0);
967 g_return_val_if_fail (func, 0);
968
969 td = get_thread_data (wrapper, g_thread_self());
970
971 job = g_new0 (Job, 1);
972 job->type = JOB_TYPE_EXECUTE;
973 job->processed = FALSE;
974 job->cancelled = FALSE;
975 g_rec_mutex_lock (&(wrapper->priv->rmutex));
976 job->job_id = wrapper->priv->next_job_id++;
977 g_rec_mutex_unlock (&(wrapper->priv->rmutex));
978 job->func = NULL;
979 job->void_func = func;
980 job->arg = arg;
981 job->arg_destroy_func = arg_destroy_func;
982 job->reply_queue = g_async_queue_ref (td->from_worker_thread);
983 job->notif = pipe_ref (td->notif);
984
985 id = job->job_id;
986 #ifdef THREAD_WRAPPER_DEBUG
987 g_print ("... submitted VOID job %d\n", id);
988 #endif
989
990 td->jobs = g_slist_append (td->jobs, job);
991
992 if (g_thread_self () == wrapper->priv->worker_thread) {
993 job->processed = TRUE;
994 if (job->func)
995 job->u.exe.result = job->func (job->arg, &(job->u.exe.error));
996 else {
997 job->u.exe.result = NULL;
998 job->void_func (job->arg, &(job->u.exe.error));
999 }
1000 #ifdef THREAD_WRAPPER_DEBUG
1001 g_print ("... IMMEDIATELY done VOID job %d => %p\n", job->job_id, job->u.exe.result);
1002 #endif
1003 guint jid = job->job_id;
1004 Pipe *jpipe = pipe_ref (job->notif);
1005 g_async_queue_push (job->reply_queue, job);
1006 write_notification (wrapper, td, jpipe, GDA_THREAD_NOTIFICATION_JOB, jid);
1007 }
1008 else
1009 g_async_queue_push (wrapper->priv->to_worker_thread, job);
1010
1011 return id;
1012 }
1013
1014 /**
1015 * gda_thread_wrapper_cancel:
1016 * @wrapper: a #GdaThreadWrapper object
1017 * @id: the ID of a job as returned by gda_thread_wrapper_execute() or gda_thread_wrapper_execute_void()
1018 *
1019 * Cancels a job not yet executed. This may fail for the following reasons:
1020 * <itemizedlist>
1021 * <listitem><para>the job @id could not be found, either because it has already been treated or because
1022 * it does not exist or because it was created in another thread</para></listitem>
1023 * <listitem><para>the job @id is currently being treated by the worker thread</para></listitem>
1024 * </itemizedlist>
1025 *
1026 * Returns: %TRUE if the job has been cancelled, or %FALSE in any other case.
1027 *
1028 * Since: 4.2
1029 */
1030 gboolean
1031 gda_thread_wrapper_cancel (GdaThreadWrapper *wrapper, guint id)
1032 {
1033 GSList *list;
1034 gboolean retval = FALSE; /* default if job not found */
1035 ThreadData *td;
1036
1037 g_return_val_if_fail (GDA_IS_THREAD_WRAPPER (wrapper), FALSE);
1038
1039 g_rec_mutex_lock (&(wrapper->priv->rmutex));
1040
1041 td = g_hash_table_lookup (wrapper->priv->threads_hash, g_thread_self());
1042 if (!td) {
1043 /* nothing to be done for this thread */
1044 g_rec_mutex_unlock (&(wrapper->priv->rmutex));
1045 return FALSE;
1046 }
1047
1048 g_async_queue_lock (wrapper->priv->to_worker_thread);
1049 for (list = td->jobs; list; list = list->next) {
1050 Job *job = JOB (list->data);
1051 if (job->job_id == id) {
1052 if (job->processed) {
1053 /* can't cancel it as it's being treated */
1054 break;
1055 }
1056
1057 retval = TRUE;
1058 job->cancelled = TRUE;
1059 if (job->arg && job->arg_destroy_func) {
1060 job->arg_destroy_func (job->arg);
1061 job->arg = NULL;
1062 }
1063 td->jobs = g_slist_delete_link (td->jobs, list);
1064 break;
1065 }
1066 }
1067 g_async_queue_unlock (wrapper->priv->to_worker_thread);
1068 g_rec_mutex_unlock (&(wrapper->priv->rmutex));
1069
1070 return retval;
1071 }
1072
1073 /**
1074 * gda_thread_wrapper_iterate:
1075 * @wrapper: a #GdaThreadWrapper object
1076 * @may_block: whether the call may block
1077 *
1078 * This method gives @wrapper a chance to check if some functions to be executed have finished
1079 * <emphasis>for the calling thread</emphasis>. In this case it handles the execution result and
1080 * makes it ready to be processed using gda_thread_wrapper_fetch_result().
1081 *
1082 * This method also allows @wrapper to handle signals which may have been emitted by objects
1083 * while in the worker thread, and call the callback function specified when gda_thread_wrapper_connect_raw()
1084 * was used.
1085 *
1086 * If @may_block is %TRUE, then it will block untill there is one finished execution
1087 * (functions returning void and signals are ignored regarding this argument).
1088 *
1089 * Since: 4.2
1090 */
1091 void
1092 gda_thread_wrapper_iterate (GdaThreadWrapper *wrapper, gboolean may_block)
1093 {
1094 ThreadData *td;
1095 Job *job;
1096
1097 g_return_if_fail (GDA_IS_THREAD_WRAPPER (wrapper));
1098 g_return_if_fail (wrapper->priv);
1099
1100 g_rec_mutex_lock (&(wrapper->priv->rmutex));
1101 td = g_hash_table_lookup (wrapper->priv->threads_hash, g_thread_self());
1102 g_rec_mutex_unlock (&(wrapper->priv->rmutex));
1103 if (!td) {
1104 /* nothing to be done for this thread */
1105 return;
1106 }
1107
1108 again:
1109 if (may_block)
1110 job = g_async_queue_pop (td->from_worker_thread);
1111 else
1112 job = g_async_queue_try_pop (td->from_worker_thread);
1113 if (job) {
1114 gboolean do_again = FALSE;
1115 td->jobs = g_slist_remove (td->jobs, job);
1116 #ifdef THREAD_WRAPPER_DEBUG
1117 g_print ("Popped job %d (type %d), for wrapper %p from thread %p\n",
1118 job->job_id, job->type, wrapper, g_thread_self ());
1119 #endif
1120
1121 if (job->type == JOB_TYPE_EXECUTE) {
1122 if (!job->func) {
1123 job_free (job); /* ignore as there is no result */
1124 do_again = TRUE;
1125 }
1126 else
1127 td->results = g_slist_append (td->results, job);
1128 }
1129 else if (job->type == JOB_TYPE_SIGNAL) {
1130 /* run callback now */
1131 SignalSpec *spec = job->u.signal.spec;
1132
1133 if (spec->callback)
1134 spec->callback (wrapper, spec->instance, ((GSignalQuery*)spec)->signal_name,
1135 job->u.signal.n_param_values, job->u.signal.param_values, NULL,
1136 spec->data);
1137 #ifdef THREAD_WRAPPER_DEBUG
1138 else
1139 g_print ("Not propagating signal %s\n", ((GSignalQuery*)spec)->signal_name);
1140 #endif
1141 job->u.signal.spec = NULL;
1142 job_free (job);
1143 signal_spec_lock (spec);
1144 signal_spec_unref (spec);
1145 do_again = TRUE;
1146 }
1147 else if (job->type == JOB_TYPE_NOTIFICATION_ERROR) {
1148 job_free (job);
1149 clean_notifications (wrapper, td);
1150 }
1151 else
1152 g_assert_not_reached ();
1153
1154 if (do_again)
1155 goto again;
1156 }
1157 }
1158
1159 /**
1160 * gda_thread_wrapper_fetch_result:
1161 * @wrapper: a #GdaThreadWrapper object
1162 * @may_lock: TRUE if this funct must lock the caller untill a result is available
1163 * @exp_id: ID of the job for which a result is expected
1164 * @error: a place to store errors, for errors which may have occurred during the execution, or %NULL
1165 *
1166 * Use this method to check if the execution of a function is finished. The function's execution must have
1167 * been requested using gda_thread_wrapper_execute().
1168 *
1169 * Returns: (transfer none) (allow-none): the pointer returned by the execution, or %NULL if no result is available
1170 *
1171 * Since: 4.2
1172 */
1173 gpointer
1174 gda_thread_wrapper_fetch_result (GdaThreadWrapper *wrapper, gboolean may_lock, guint exp_id, GError **error)
1175 {
1176 ThreadData *td;
1177 Job *job = NULL;
1178 gpointer retval = NULL;
1179
1180 g_return_val_if_fail (GDA_IS_THREAD_WRAPPER (wrapper), NULL);
1181 g_return_val_if_fail (wrapper->priv, NULL);
1182 g_return_val_if_fail (exp_id > 0, NULL);
1183
1184 g_rec_mutex_lock (&(wrapper->priv->rmutex));
1185 td = g_hash_table_lookup (wrapper->priv->threads_hash, g_thread_self());
1186 g_rec_mutex_unlock (&(wrapper->priv->rmutex));
1187 if (!td) {
1188 /* nothing to be done for this thread */
1189 return NULL;
1190 }
1191
1192 do {
1193 if (td->results) {
1194 /* see if we have the result we want */
1195 GSList *list;
1196 for (list = td->results; list; list = list->next) {
1197 job = JOB (list->data);
1198 if (job->job_id == exp_id) {
1199 /* found it */
1200 td->results = g_slist_delete_link (td->results, list);
1201 if (!td->results &&
1202 !td->jobs &&
1203 (g_async_queue_length (td->from_worker_thread) == 0) &&
1204 !td->signals_list) {
1205 /* remove this ThreadData */
1206 g_rec_mutex_lock (&(wrapper->priv->rmutex));
1207 g_hash_table_remove (wrapper->priv->threads_hash, g_thread_self());
1208 g_rec_mutex_unlock (&(wrapper->priv->rmutex));
1209 }
1210 goto out;
1211 }
1212 }
1213 }
1214
1215 if (may_lock)
1216 gda_thread_wrapper_iterate (wrapper, TRUE);
1217 else {
1218 gsize len;
1219 len = g_slist_length (td->results);
1220 gda_thread_wrapper_iterate (wrapper, FALSE);
1221 if (g_slist_length (td->results) == len) {
1222 job = NULL;
1223 break;
1224 }
1225 }
1226 } while (1);
1227
1228 out:
1229 if (job) {
1230 g_assert (job->type == JOB_TYPE_EXECUTE);
1231 if (job->u.exe.error) {
1232 g_propagate_error (error, job->u.exe.error);
1233 job->u.exe.error = NULL;
1234 }
1235 retval = job->u.exe.result;
1236 job->u.exe.result = NULL;
1237 job_free (job);
1238 }
1239
1240 return retval;
1241 }
1242
1243 /**
1244 * gda_thread_wrapper_get_waiting_size:
1245 * @wrapper: a #GdaThreadWrapper object
1246 *
1247 * Use this method to query the number of functions which have been queued to be executed
1248 * but which have not yet been executed.
1249 *
1250 * Returns: the number of jobs not yet executed
1251 *
1252 * Since: 4.2
1253 */
1254 gint
1255 gda_thread_wrapper_get_waiting_size (GdaThreadWrapper *wrapper)
1256 {
1257 GSList *list;
1258 gint size = 0;
1259 ThreadData *td;
1260
1261 g_return_val_if_fail (GDA_IS_THREAD_WRAPPER (wrapper), 0);
1262 g_return_val_if_fail (wrapper->priv, 0);
1263
1264 g_rec_mutex_lock (&(wrapper->priv->rmutex));
1265 td = g_hash_table_lookup (wrapper->priv->threads_hash, g_thread_self());
1266 if (!td) {
1267 /* nothing to be done for this thread */
1268 g_rec_mutex_unlock (&(wrapper->priv->rmutex));
1269 return 0;
1270 }
1271
1272 /* lock job consuming queue to avoid that the worker thread "consume" a Job */
1273 g_async_queue_lock (wrapper->priv->to_worker_thread);
1274 for (size = 0, list = td->jobs; list; list = list->next) {
1275 if (!JOB (list->data)->cancelled)
1276 size++;
1277 }
1278 g_async_queue_unlock (wrapper->priv->to_worker_thread);
1279 g_rec_mutex_unlock (&(wrapper->priv->rmutex));
1280 return size;
1281 }
1282
1283 /*
1284 * Executed in sub thread (or potentially in other threads, in which case will be ignored)
1285 * pushes data into the queue
1286 */
1287 static void
1288 worker_thread_closure_marshal (GClosure *closure,
1289 G_GNUC_UNUSED GValue *return_value,
1290 guint n_param_values,
1291 const GValue *param_values,
1292 G_GNUC_UNUSED gpointer invocation_hint,
1293 G_GNUC_UNUSED gpointer marshal_data)
1294 {
1295 SignalSpec *sigspec = (SignalSpec *) closure->data;
1296
1297 /* if the signal is not emitted from the working thread then don't do anything */
1298 if (g_thread_self () != sigspec->worker_thread)
1299 return;
1300
1301 /* check that the worker thread is working on a job for which job->reply_queue == sigspec->reply_queue */
1302 if (sigspec->private &&
1303 g_private_get (&worker_thread_current_queue) != sigspec->reply_queue)
1304 return;
1305
1306 gsize i;
1307 /*
1308 for (i = 1; i < n_param_values; i++) {
1309 g_print ("\t%d => %s\n", i, gda_value_stringify (param_values + i));
1310 }
1311 */
1312 Job *job= g_new0 (Job, 1);
1313 job->type = JOB_TYPE_SIGNAL;
1314 job->u.signal.spec = signal_spec_ref (sigspec);
1315 job->u.signal.n_param_values = n_param_values - 1;
1316 job->u.signal.param_values = g_new0 (GValue, job->u.signal.n_param_values);
1317 for (i = 1; i < n_param_values; i++) {
1318 const GValue *src;
1319 GValue *dest;
1320
1321 src = param_values + i;
1322 dest = job->u.signal.param_values + i - 1;
1323
1324 g_value_init (dest, G_VALUE_TYPE (src));
1325 g_value_copy (src, dest);
1326 }
1327
1328 Pipe *jpipe = pipe_ref (sigspec->notif);
1329 g_async_queue_push (sigspec->reply_queue, job);
1330 if (! write_notification (NULL, NULL, jpipe, GDA_THREAD_NOTIFICATION_SIGNAL, 0)) {
1331 Job *je = g_new0 (Job, 1);
1332 je->type = JOB_TYPE_NOTIFICATION_ERROR;
1333 g_async_queue_push (sigspec->reply_queue, je);
1334 }
1335 signal_spec_unlock (sigspec);
1336 }
1337
1338 /*
1339 * Executed in sub thread (or potentially in other threads, in which case will be ignored)
1340 * pushes data into the queue
1341 */
1342 static void
1343 worker_thread_closure_marshal_anythread (GClosure *closure,
1344 G_GNUC_UNUSED GValue *return_value,
1345 guint n_param_values,
1346 const GValue *param_values,
1347 G_GNUC_UNUSED gpointer invocation_hint,
1348 G_GNUC_UNUSED gpointer marshal_data)
1349 {
1350 SignalSpec *sigspec = (SignalSpec *) closure->data;
1351
1352 gsize i;
1353 /*
1354 for (i = 1; i < n_param_values; i++) {
1355 g_print ("\t%d => %s\n", i, gda_value_stringify (param_values + i));
1356 }
1357 */
1358 Job *job= g_new0 (Job, 1);
1359 job->type = JOB_TYPE_SIGNAL;
1360 job->u.signal.spec = signal_spec_ref (sigspec);
1361 job->u.signal.n_param_values = n_param_values - 1;
1362 job->u.signal.param_values = g_new0 (GValue, job->u.signal.n_param_values);
1363 job->notif = NULL;
1364 for (i = 1; i < n_param_values; i++) {
1365 const GValue *src;
1366 GValue *dest;
1367
1368 src = param_values + i;
1369 dest = job->u.signal.param_values + i - 1;
1370
1371 g_value_init (dest, G_VALUE_TYPE (src));
1372 g_value_copy (src, dest);
1373 }
1374
1375 Pipe *jpipe = pipe_ref (sigspec->notif);
1376 g_async_queue_push (sigspec->reply_queue, job);
1377 if (! write_notification (NULL, NULL, jpipe, GDA_THREAD_NOTIFICATION_SIGNAL, 0)) {
1378 Job *je = g_new0 (Job, 1);
1379 je->type = JOB_TYPE_NOTIFICATION_ERROR;
1380 g_async_queue_push (sigspec->reply_queue, je);
1381 }
1382 signal_spec_unlock (sigspec);
1383 }
1384
1385 /**
1386 * gda_thread_wrapper_connect_raw:
1387 * @wrapper: a #GdaThreadWrapper object
1388 * @instance: the instance to connect to
1389 * @sig_name: a string of the form "signal-name::detail"
1390 * @private_thread: set to %TRUE if @callback is to be invoked only if the signal has
1391 * been emitted while in @wrapper's private sub thread (ie. used when @wrapper is executing some functions
1392 * specified by gda_thread_wrapper_execute() or gda_thread_wrapper_execute_void()), and to %FALSE if the
1393 * callback is to be invoked whenever the signal is emitted, independently of the thread in which the
1394 * signal is emitted.
1395 * @private_job: set to %TRUE if @callback is to be invoked only if the signal has
1396 * been emitted when a job created for the calling thread is being executed, and to %FALSE
1397 * if @callback has to be called whenever the @sig_name signal is emitted by @instance. Note that
1398 * this argument is not taken into account if @private_thread is set to %FALSE.
1399 * @callback: (scope call): a #GdaThreadWrapperCallback function
1400 * @data: (closure): data to pass to @callback's calls
1401 *
1402 * Connects a callback function to a signal for a particular object. The difference with g_signal_connect() and
1403 * similar functions are:
1404 * <itemizedlist>
1405 * <listitem><para>the @callback argument is not a #GCallback function, so the callback signature is not
1406 * dependent on the signal itself</para></listitem>
1407 * <listitem><para>the signal handler must not have to return any value</para></listitem>
1408 * <listitem><para>the @callback function will be called asynchronously, the caller may need to use
1409 * gda_thread_wrapper_iterate() to get the notification</para></listitem>
1410 * <listitem><para>if @private_job and @private_thread control in which case the signal is propagated.</para></listitem>
1411 * </itemizedlist>
1412 *
1413 * Also note that signal handling is done asynchronously: when emitted in the worker thread, it
1414 * will be "queued" to be processed in the user thread when it has the chance (when gda_thread_wrapper_iterate()
1415 * is called directly or indirectly). The side effect is that the callback function is usually
1416 * called long after the object emitting the signal has finished emitting it.
1417 *
1418 * To disconnect a signal handler, don't use any of the g_signal_handler_*() functions but the
1419 * gda_thread_wrapper_disconnect() method.
1420 *
1421 * Returns: the handler ID
1422 *
1423 * Since: 4.2
1424 */
1425 gulong
1426 gda_thread_wrapper_connect_raw (GdaThreadWrapper *wrapper,
1427 gpointer instance,
1428 const gchar *sig_name,
1429 gboolean private_thread, gboolean private_job,
1430 GdaThreadWrapperCallback callback,
1431 gpointer data)
1432 {
1433 guint sigid;
1434 SignalSpec *sigspec;
1435 ThreadData *td;
1436
1437 g_return_val_if_fail (GDA_IS_THREAD_WRAPPER (wrapper), 0);
1438 g_return_val_if_fail (wrapper->priv, 0);
1439
1440 g_rec_mutex_lock (&(wrapper->priv->rmutex));
1441
1442 td = get_thread_data (wrapper, g_thread_self());
1443
1444 sigid = g_signal_lookup (sig_name, /* FIXME: use g_signal_parse_name () */
1445 G_TYPE_FROM_INSTANCE (instance));
1446 if (sigid == 0) {
1447 g_warning (_("Signal does not exist\n"));
1448 return 0;
1449 }
1450
1451 sigspec = g_new0 (SignalSpec, 1);
1452 sigspec->private = private_job;
1453 g_signal_query (sigid, (GSignalQuery*) sigspec);
1454
1455 if (((GSignalQuery*) sigspec)->return_type != G_TYPE_NONE) {
1456 g_warning (_("Signal to connect to must not have a return value\n"));
1457 g_free (sigspec);
1458 return 0;
1459 }
1460 sigspec->worker_thread = wrapper->priv->worker_thread;
1461 sigspec->reply_queue = g_async_queue_ref (td->from_worker_thread);
1462 sigspec->notif = pipe_ref (td->notif);
1463 sigspec->instance = instance;
1464 sigspec->callback = callback;
1465 sigspec->data = data;
1466 g_mutex_init (&(sigspec->mutex));
1467 sigspec->ref_count = 1;
1468
1469 GClosure *cl;
1470 cl = g_closure_new_simple (sizeof (GClosure), sigspec);
1471 if (private_thread)
1472 g_closure_set_marshal (cl, (GClosureMarshal) worker_thread_closure_marshal);
1473 else
1474 g_closure_set_marshal (cl, (GClosureMarshal) worker_thread_closure_marshal_anythread);
1475 sigspec->signal_id = g_signal_connect_closure (instance, sig_name, cl, FALSE);
1476
1477 td->signals_list = g_slist_append (td->signals_list, sigspec);
1478
1479 g_rec_mutex_unlock (&(wrapper->priv->rmutex));
1480
1481 return sigspec->signal_id;
1482 }
1483
1484 static gboolean
1485 find_signal_r_func (G_GNUC_UNUSED GThread *thread, ThreadData *td, gulong *id)
1486 {
1487 GSList *list;
1488 for (list = td->signals_list; list; list = list->next) {
1489 SignalSpec *sigspec;
1490 sigspec = (SignalSpec*) list->data;
1491 if (sigspec->signal_id == *id)
1492 return TRUE;
1493 }
1494 return FALSE;
1495 }
1496
1497 /**
1498 * gda_thread_wrapper_disconnect:
1499 * @wrapper: a #GdaThreadWrapper object
1500 * @id: a handler ID, as returned by gda_thread_wrapper_connect_raw()
1501 *
1502 * Disconnects the emission of a signal, does the opposite of gda_thread_wrapper_connect_raw().
1503 *
1504 * As soon as this method returns, the callback function set when gda_thread_wrapper_connect_raw()
1505 * was called will not be called anymore (even if the object has emitted the signal in the worker
1506 * thread and this signal has not been handled in the user thread).
1507 *
1508 * Since: 4.2
1509 */
1510 void
1511 gda_thread_wrapper_disconnect (GdaThreadWrapper *wrapper, gulong id)
1512 {
1513 SignalSpec *sigspec = NULL;
1514 ThreadData *td;
1515 GSList *list;
1516
1517 g_return_if_fail (GDA_IS_THREAD_WRAPPER (wrapper));
1518 g_return_if_fail (wrapper->priv);
1519
1520 g_rec_mutex_lock (&(wrapper->priv->rmutex));
1521
1522 td = g_hash_table_lookup (wrapper->priv->threads_hash, g_thread_self());
1523 if (!td) {
1524 gulong theid = id;
1525 td = g_hash_table_find (wrapper->priv->threads_hash,
1526 (GHRFunc) find_signal_r_func, &theid);
1527 }
1528 if (!td) {
1529 g_warning (_("Signal %lu does not exist"), id);
1530 g_rec_mutex_unlock (&(wrapper->priv->rmutex));
1531 return;
1532 }
1533
1534 for (list = td->signals_list; list; list = list->next) {
1535 if (((SignalSpec*) list->data)->signal_id == id) {
1536 sigspec = (SignalSpec*) list->data;
1537 break;
1538 }
1539 }
1540
1541 if (!sigspec) {
1542 g_warning (_("Signal %lu does not exist"), id);
1543 g_rec_mutex_unlock (&(wrapper->priv->rmutex));
1544 return;
1545 }
1546
1547 signal_spec_lock (sigspec);
1548 #ifdef THREAD_WRAPPER_DEBUG
1549 g_print ("Disconnecting signal %s for wrapper %p\n", ((GSignalQuery*)sigspec)->signal_name, wrapper);
1550 #endif
1551 td->signals_list = g_slist_remove (td->signals_list, sigspec);
1552 g_signal_handler_disconnect (sigspec->instance, sigspec->signal_id);
1553 sigspec->instance = NULL;
1554 sigspec->signal_id = 0;
1555 g_async_queue_unref (sigspec->reply_queue);
1556 sigspec->reply_queue = NULL;
1557 sigspec->callback = NULL;
1558 sigspec->data = NULL;
1559 signal_spec_unref (sigspec);
1560
1561 if (!td->results &&
1562 !td->jobs &&
1563 (g_async_queue_length (td->from_worker_thread) == 0) &&
1564 !td->signals_list) {
1565 /* remove this ThreadData */
1566 g_hash_table_remove (wrapper->priv->threads_hash, g_thread_self());
1567 }
1568 g_rec_mutex_unlock (&(wrapper->priv->rmutex));
1569 }
1570
1571 /**
1572 * gda_thread_wrapper_steal_signal:
1573 * @wrapper: a #GdaThreadWrapper object
1574 * @id: a signal ID
1575 *
1576 * Requests that the signal which ID is @id (which has been obtained using gda_thread_wrapper_connect_raw())
1577 * be treated by the calling thread instead of by the thread in which gda_thread_wrapper_connect_raw()
1578 * was called.
1579 *
1580 * Since: 4.2
1581 */
1582
1583 void
1584 gda_thread_wrapper_steal_signal (GdaThreadWrapper *wrapper, gulong id)
1585 {
1586 ThreadData *old_td, *new_td = NULL;
1587 g_return_if_fail (GDA_IS_THREAD_WRAPPER (wrapper));
1588 g_return_if_fail (wrapper->priv);
1589 g_return_if_fail (id > 0);
1590
1591 g_rec_mutex_lock (&(wrapper->priv->rmutex));
1592
1593 gulong theid = id;
1594 old_td = g_hash_table_find (wrapper->priv->threads_hash,
1595 (GHRFunc) find_signal_r_func, &theid);
1596 if (!old_td) {
1597 g_warning (_("Signal %lu does not exist"), id);
1598 g_rec_mutex_unlock (&(wrapper->priv->rmutex));
1599 return;
1600 }
1601
1602 if (old_td->owner == g_thread_self ()) {
1603 g_rec_mutex_unlock (&(wrapper->priv->rmutex));
1604 return;
1605 }
1606
1607 /* merge old_td and new_td */
1608 if (old_td->signals_list) {
1609 GSList *list;
1610 for (list = old_td->signals_list; list; list = list->next) {
1611 SignalSpec *sigspec = (SignalSpec*) list->data;
1612 if (sigspec->signal_id == id) {
1613 new_td = get_thread_data (wrapper, g_thread_self ());
1614 new_td->signals_list = g_slist_prepend (new_td->signals_list, sigspec);
1615 old_td->signals_list = g_slist_remove (old_td->signals_list, sigspec);
1616 g_async_queue_unref (sigspec->reply_queue);
1617 sigspec->reply_queue = g_async_queue_ref (new_td->from_worker_thread);
1618 break;
1619 }
1620 }
1621 }
1622
1623 g_rec_mutex_unlock (&(wrapper->priv->rmutex));
1624 }
1625