1 /**
2  * MessageQueueSource:
3  *
4  * This is a #GSource which wraps a #GAsyncQueue and is dispatched whenever a
5  * message can be pulled off the queue. Messages can be enqueued from any
6  * thread.
7  *
8  * The callbacks dispatched by a #MessageQueueSource have type
9  * #MessageQueueSourceFunc.
10  *
11  * #MessageQueueSource supports adding a #GCancellable child source which will
12  * additionally dispatch if a provided #GCancellable is cancelled.
13  */
14 typedef struct {
15   GSource         parent;
16   GAsyncQueue    *queue;  /* owned */
17   GDestroyNotify  destroy_message;
18 } MessageQueueSource;
19 
20 /**
21  * MessageQueueSourceFunc:
22  * @message: (transfer full) (nullable): message pulled off the queue
23  * @user_data: user data provided to g_source_set_callback()
24  *
25  * Callback function type for #MessageQueueSource.
26  */
27 typedef gboolean (*MessageQueueSourceFunc) (gpointer message,
28                                             gpointer user_data);
29 
30 static gboolean
message_queue_source_prepare(GSource * source,gint * timeout_)31 message_queue_source_prepare (GSource *source,
32                               gint    *timeout_)
33 {
34   MessageQueueSource *message_queue_source = (MessageQueueSource *) source;
35 
36   return (g_async_queue_length (message_queue_source->queue) > 0);
37 }
38 
39 static gboolean
message_queue_source_dispatch(GSource * source,GSourceFunc callback,gpointer user_data)40 message_queue_source_dispatch (GSource     *source,
41                                GSourceFunc  callback,
42                                gpointer     user_data)
43 {
44   MessageQueueSource *message_queue_source = (MessageQueueSource *) source;
45   gpointer message;
46   MessageQueueSourceFunc func = (MessageQueueSourceFunc) callback;
47 
48   /* Pop a message off the queue. */
49   message = g_async_queue_try_pop (message_queue_source->queue);
50 
51   /* If there was no message, bail. */
52   if (message == NULL)
53     {
54       /* Keep the source around to handle the next message. */
55       return TRUE;
56     }
57 
58   /* @func may be %NULL if no callback was specified.
59    * If so, drop the message. */
60   if (func == NULL)
61     {
62       if (message_queue_source->destroy_message != NULL)
63         {
64           message_queue_source->destroy_message (message);
65         }
66 
67       /* Keep the source around to consume the next message. */
68       return TRUE;
69     }
70 
71   return func (message, user_data);
72 }
73 
74 static void
message_queue_source_finalize(GSource * source)75 message_queue_source_finalize (GSource *source)
76 {
77   MessageQueueSource *message_queue_source = (MessageQueueSource *) source;
78 
79   g_async_queue_unref (message_queue_source->queue);
80 }
81 
82 static gboolean
message_queue_source_closure_callback(gpointer message,gpointer user_data)83 message_queue_source_closure_callback (gpointer message,
84                                        gpointer user_data)
85 {
86   GClosure *closure = user_data;
87   GValue param_value = G_VALUE_INIT;
88   GValue result_value = G_VALUE_INIT;
89   gboolean retval;
90 
91   /* The invoked function is responsible for freeing @message. */
92   g_value_init (&result_value, G_TYPE_BOOLEAN);
93   g_value_init (&param_value, G_TYPE_POINTER);
94   g_value_set_pointer (&param_value, message);
95 
96   g_closure_invoke (closure, &result_value, 1, &param_value, NULL);
97   retval = g_value_get_boolean (&result_value);
98 
99   g_value_unset (&param_value);
100   g_value_unset (&result_value);
101 
102   return retval;
103 }
104 
105 static GSourceFuncs message_queue_source_funcs =
106   {
107     message_queue_source_prepare,
108     NULL,  /* check */
109     message_queue_source_dispatch,
110     message_queue_source_finalize,
111     (GSourceFunc) message_queue_source_closure_callback,
112     NULL,
113   };
114 
115 /**
116  * message_queue_source_new:
117  * @queue: the queue to check
118  * @destroy_message: (nullable): function to free a message, or %NULL
119  * @cancellable: (nullable): a #GCancellable, or %NULL
120  *
121  * Create a new #MessageQueueSource, a type of #GSource which dispatches for
122  * each message queued to it.
123  *
124  * If a callback function of type #MessageQueueSourceFunc is connected to the
125  * returned #GSource using g_source_set_callback(), it will be invoked for each
126  * message, with the message passed as its first argument. It is responsible for
127  * freeing the message. If no callback is set, messages are automatically freed
128  * as they are queued.
129  *
130  * Returns: (transfer full): a new #MessageQueueSource
131  */
132 GSource *
message_queue_source_new(GAsyncQueue * queue,GDestroyNotify destroy_message,GCancellable * cancellable)133 message_queue_source_new (GAsyncQueue    *queue,
134                           GDestroyNotify  destroy_message,
135                           GCancellable   *cancellable)
136 {
137   GSource *source;  /* alias of @message_queue_source */
138   MessageQueueSource *message_queue_source;  /* alias of @source */
139 
140   g_return_val_if_fail (queue != NULL, NULL);
141   g_return_val_if_fail (cancellable == NULL ||
142                         G_IS_CANCELLABLE (cancellable), NULL);
143 
144   source = g_source_new (&message_queue_source_funcs,
145                          sizeof (MessageQueueSource));
146   message_queue_source = (MessageQueueSource *) source;
147 
148   /* The caller can overwrite this name with something more useful later. */
149   g_source_set_name (source, "MessageQueueSource");
150 
151   message_queue_source->queue = g_async_queue_ref (queue);
152   message_queue_source->destroy_message = destroy_message;
153 
154   /* Add a cancellable source. */
155   if (cancellable != NULL)
156     {
157       GSource *cancellable_source;
158 
159       cancellable_source = g_cancellable_source_new (cancellable);
160       g_source_set_dummy_callback (cancellable_source);
161       g_source_add_child_source (source, cancellable_source);
162       g_source_unref (cancellable_source);
163     }
164 
165   return source;
166 }
167