1 /**
2 * MessageQueueSource:
3 *
4 * This is a #GSource which wraps a #GAsyncQueue and is dispatched whenever a
5 * message can be pulled off the queue. Messages can be enqueued from any
6 * thread.
7 *
8 * The callbacks dispatched by a #MessageQueueSource have type
9 * #MessageQueueSourceFunc.
10 *
11 * #MessageQueueSource supports adding a #GCancellable child source which will
12 * additionally dispatch if a provided #GCancellable is cancelled.
13 */
14 typedef struct {
15 GSource parent;
16 GAsyncQueue *queue; /* owned */
17 GDestroyNotify destroy_message;
18 } MessageQueueSource;
19
20 /**
21 * MessageQueueSourceFunc:
22 * @message: (transfer full) (nullable): message pulled off the queue
23 * @user_data: user data provided to g_source_set_callback()
24 *
25 * Callback function type for #MessageQueueSource.
26 */
27 typedef gboolean (*MessageQueueSourceFunc) (gpointer message,
28 gpointer user_data);
29
30 static gboolean
message_queue_source_prepare(GSource * source,gint * timeout_)31 message_queue_source_prepare (GSource *source,
32 gint *timeout_)
33 {
34 MessageQueueSource *message_queue_source = (MessageQueueSource *) source;
35
36 return (g_async_queue_length (message_queue_source->queue) > 0);
37 }
38
39 static gboolean
message_queue_source_dispatch(GSource * source,GSourceFunc callback,gpointer user_data)40 message_queue_source_dispatch (GSource *source,
41 GSourceFunc callback,
42 gpointer user_data)
43 {
44 MessageQueueSource *message_queue_source = (MessageQueueSource *) source;
45 gpointer message;
46 MessageQueueSourceFunc func = (MessageQueueSourceFunc) callback;
47
48 /* Pop a message off the queue. */
49 message = g_async_queue_try_pop (message_queue_source->queue);
50
51 /* If there was no message, bail. */
52 if (message == NULL)
53 {
54 /* Keep the source around to handle the next message. */
55 return TRUE;
56 }
57
58 /* @func may be %NULL if no callback was specified.
59 * If so, drop the message. */
60 if (func == NULL)
61 {
62 if (message_queue_source->destroy_message != NULL)
63 {
64 message_queue_source->destroy_message (message);
65 }
66
67 /* Keep the source around to consume the next message. */
68 return TRUE;
69 }
70
71 return func (message, user_data);
72 }
73
74 static void
message_queue_source_finalize(GSource * source)75 message_queue_source_finalize (GSource *source)
76 {
77 MessageQueueSource *message_queue_source = (MessageQueueSource *) source;
78
79 g_async_queue_unref (message_queue_source->queue);
80 }
81
82 static gboolean
message_queue_source_closure_callback(gpointer message,gpointer user_data)83 message_queue_source_closure_callback (gpointer message,
84 gpointer user_data)
85 {
86 GClosure *closure = user_data;
87 GValue param_value = G_VALUE_INIT;
88 GValue result_value = G_VALUE_INIT;
89 gboolean retval;
90
91 /* The invoked function is responsible for freeing @message. */
92 g_value_init (&result_value, G_TYPE_BOOLEAN);
93 g_value_init (¶m_value, G_TYPE_POINTER);
94 g_value_set_pointer (¶m_value, message);
95
96 g_closure_invoke (closure, &result_value, 1, ¶m_value, NULL);
97 retval = g_value_get_boolean (&result_value);
98
99 g_value_unset (¶m_value);
100 g_value_unset (&result_value);
101
102 return retval;
103 }
104
105 static GSourceFuncs message_queue_source_funcs =
106 {
107 message_queue_source_prepare,
108 NULL, /* check */
109 message_queue_source_dispatch,
110 message_queue_source_finalize,
111 (GSourceFunc) message_queue_source_closure_callback,
112 NULL,
113 };
114
115 /**
116 * message_queue_source_new:
117 * @queue: the queue to check
118 * @destroy_message: (nullable): function to free a message, or %NULL
119 * @cancellable: (nullable): a #GCancellable, or %NULL
120 *
121 * Create a new #MessageQueueSource, a type of #GSource which dispatches for
122 * each message queued to it.
123 *
124 * If a callback function of type #MessageQueueSourceFunc is connected to the
125 * returned #GSource using g_source_set_callback(), it will be invoked for each
126 * message, with the message passed as its first argument. It is responsible for
127 * freeing the message. If no callback is set, messages are automatically freed
128 * as they are queued.
129 *
130 * Returns: (transfer full): a new #MessageQueueSource
131 */
132 GSource *
message_queue_source_new(GAsyncQueue * queue,GDestroyNotify destroy_message,GCancellable * cancellable)133 message_queue_source_new (GAsyncQueue *queue,
134 GDestroyNotify destroy_message,
135 GCancellable *cancellable)
136 {
137 GSource *source; /* alias of @message_queue_source */
138 MessageQueueSource *message_queue_source; /* alias of @source */
139
140 g_return_val_if_fail (queue != NULL, NULL);
141 g_return_val_if_fail (cancellable == NULL ||
142 G_IS_CANCELLABLE (cancellable), NULL);
143
144 source = g_source_new (&message_queue_source_funcs,
145 sizeof (MessageQueueSource));
146 message_queue_source = (MessageQueueSource *) source;
147
148 /* The caller can overwrite this name with something more useful later. */
149 g_source_set_name (source, "MessageQueueSource");
150
151 message_queue_source->queue = g_async_queue_ref (queue);
152 message_queue_source->destroy_message = destroy_message;
153
154 /* Add a cancellable source. */
155 if (cancellable != NULL)
156 {
157 GSource *cancellable_source;
158
159 cancellable_source = g_cancellable_source_new (cancellable);
160 g_source_set_dummy_callback (cancellable_source);
161 g_source_add_child_source (source, cancellable_source);
162 g_source_unref (cancellable_source);
163 }
164
165 return source;
166 }
167