1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
2 /*
3  * soup-client-input-stream.c
4  *
5  * Copyright 2010-2012 Red Hat, Inc.
6  */
7 
8 #ifdef HAVE_CONFIG_H
9 #include <config.h>
10 #endif
11 
12 #include "soup-client-input-stream.h"
13 #include "soup.h"
14 #include "soup-message-private.h"
15 
16 struct _SoupClientInputStreamPrivate {
17 	SoupMessage  *msg;
18 };
19 
20 enum {
21 	SIGNAL_EOF,
22 	LAST_SIGNAL
23 };
24 
25 static guint signals[LAST_SIGNAL] = { 0 };
26 
27 enum {
28 	PROP_0,
29 
30 	PROP_MESSAGE
31 };
32 
33 static GPollableInputStreamInterface *soup_client_input_stream_parent_pollable_interface;
34 static void soup_client_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data);
35 
G_DEFINE_TYPE_WITH_CODE(SoupClientInputStream,soup_client_input_stream,SOUP_TYPE_FILTER_INPUT_STREAM,G_ADD_PRIVATE (SoupClientInputStream)G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,soup_client_input_stream_pollable_init))36 G_DEFINE_TYPE_WITH_CODE (SoupClientInputStream, soup_client_input_stream, SOUP_TYPE_FILTER_INPUT_STREAM,
37                          G_ADD_PRIVATE (SoupClientInputStream)
38 			 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
39 						soup_client_input_stream_pollable_init))
40 
41 static void
42 soup_client_input_stream_init (SoupClientInputStream *stream)
43 {
44 	stream->priv = soup_client_input_stream_get_instance_private (stream);
45 }
46 
47 static void
soup_client_input_stream_finalize(GObject * object)48 soup_client_input_stream_finalize (GObject *object)
49 {
50 	SoupClientInputStream *cistream = SOUP_CLIENT_INPUT_STREAM (object);
51 
52 	g_clear_object (&cistream->priv->msg);
53 
54 	G_OBJECT_CLASS (soup_client_input_stream_parent_class)->finalize (object);
55 }
56 
57 static void
soup_client_input_stream_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)58 soup_client_input_stream_set_property (GObject *object, guint prop_id,
59 				       const GValue *value, GParamSpec *pspec)
60 {
61 	SoupClientInputStream *cistream = SOUP_CLIENT_INPUT_STREAM (object);
62 
63 	switch (prop_id) {
64 	case PROP_MESSAGE:
65 		cistream->priv->msg = g_value_dup_object (value);
66 		break;
67 	default:
68 		G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
69 		break;
70 	}
71 }
72 
73 static void
soup_client_input_stream_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)74 soup_client_input_stream_get_property (GObject *object, guint prop_id,
75 				       GValue *value, GParamSpec *pspec)
76 {
77 	SoupClientInputStream *cistream = SOUP_CLIENT_INPUT_STREAM (object);
78 
79 	switch (prop_id) {
80 	case PROP_MESSAGE:
81 		g_value_set_object (value, cistream->priv->msg);
82 		break;
83 	default:
84 		G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
85 		break;
86 	}
87 }
88 
89 static gssize
soup_client_input_stream_read_fn(GInputStream * stream,void * buffer,gsize count,GCancellable * cancellable,GError ** error)90 soup_client_input_stream_read_fn (GInputStream  *stream,
91 				  void          *buffer,
92 				  gsize          count,
93 				  GCancellable  *cancellable,
94 				  GError       **error)
95 {
96 	gssize nread;
97 
98 	nread = G_INPUT_STREAM_CLASS (soup_client_input_stream_parent_class)->
99 		read_fn (stream, buffer, count, cancellable, error);
100 
101 	if (nread == 0)
102 		g_signal_emit (stream, signals[SIGNAL_EOF], 0);
103 
104 	return nread;
105 }
106 
107 static gssize
soup_client_input_stream_read_nonblocking(GPollableInputStream * stream,void * buffer,gsize count,GError ** error)108 soup_client_input_stream_read_nonblocking (GPollableInputStream  *stream,
109 					   void                  *buffer,
110 					   gsize                  count,
111 					   GError               **error)
112 {
113 	gssize nread;
114 
115 	nread = soup_client_input_stream_parent_pollable_interface->
116 		read_nonblocking (stream, buffer, count, error);
117 
118 	if (nread == 0)
119 		g_signal_emit (stream, signals[SIGNAL_EOF], 0);
120 
121 	return nread;
122 }
123 
124 static gboolean
soup_client_input_stream_close_fn(GInputStream * stream,GCancellable * cancellable,GError ** error)125 soup_client_input_stream_close_fn (GInputStream  *stream,
126 				   GCancellable  *cancellable,
127 				   GError       **error)
128 {
129 	SoupClientInputStream *cistream = SOUP_CLIENT_INPUT_STREAM (stream);
130 	gboolean success;
131 
132 	success = soup_message_io_run_until_finish (cistream->priv->msg, TRUE,
133 						    NULL, error);
134 	soup_message_io_finished (cistream->priv->msg);
135 	return success;
136 }
137 
138 static gboolean
idle_finish_close(gpointer user_data)139 idle_finish_close (gpointer user_data)
140 {
141 	GTask *task = user_data;
142 
143 	g_task_return_boolean (task, TRUE);
144 	g_object_unref (task);
145 	return FALSE;
146 }
147 
148 static gboolean
close_async_ready(SoupMessage * msg,gpointer user_data)149 close_async_ready (SoupMessage *msg, gpointer user_data)
150 {
151 	GTask *task = user_data;
152 	SoupClientInputStream *cistream = g_task_get_source_object (task);
153 	GError *error = NULL;
154 
155 	if (!soup_message_io_run_until_finish (cistream->priv->msg, FALSE,
156 					       g_task_get_cancellable (task),
157 					       &error) &&
158 	    g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
159 		g_error_free (error);
160 		return TRUE;
161 	}
162 
163 	soup_message_io_finished (cistream->priv->msg);
164 
165 	if (error) {
166 		g_task_return_error (task, error);
167 		g_object_unref (task);
168 		return FALSE;
169 	}
170 
171 	/* Due to a historical accident, SoupSessionAsync relies on us
172 	 * waiting one extra cycle after run_until_finish() returns.
173 	 * Ugh. FIXME later when it's easier to do.
174 	 */
175 	soup_add_idle (g_main_context_get_thread_default (),
176 		       idle_finish_close, task);
177 	return FALSE;
178 }
179 
180 static void
soup_client_input_stream_close_async(GInputStream * stream,gint priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer user_data)181 soup_client_input_stream_close_async (GInputStream        *stream,
182 				      gint                 priority,
183 				      GCancellable        *cancellable,
184 				      GAsyncReadyCallback  callback,
185 				      gpointer             user_data)
186 {
187 	SoupClientInputStream *cistream = SOUP_CLIENT_INPUT_STREAM (stream);
188 	GTask *task;
189 	GSource *source;
190 
191 	task = g_task_new (stream, cancellable, callback, user_data);
192 	g_task_set_priority (task, priority);
193 
194 	if (close_async_ready (cistream->priv->msg, task) == G_SOURCE_CONTINUE) {
195 		source = soup_message_io_get_source (cistream->priv->msg,
196 						     cancellable, NULL, NULL);
197 
198 		g_task_attach_source (task, source, (GSourceFunc) close_async_ready);
199 		g_source_unref (source);
200 	}
201 }
202 
203 static gboolean
soup_client_input_stream_close_finish(GInputStream * stream,GAsyncResult * result,GError ** error)204 soup_client_input_stream_close_finish (GInputStream  *stream,
205 				       GAsyncResult  *result,
206 				       GError       **error)
207 {
208 	return g_task_propagate_boolean (G_TASK (result), error);
209 }
210 
211 static void
soup_client_input_stream_class_init(SoupClientInputStreamClass * stream_class)212 soup_client_input_stream_class_init (SoupClientInputStreamClass *stream_class)
213 {
214 	GObjectClass *object_class = G_OBJECT_CLASS (stream_class);
215 	GInputStreamClass *input_stream_class = G_INPUT_STREAM_CLASS (stream_class);
216 
217 	object_class->finalize = soup_client_input_stream_finalize;
218 	object_class->set_property = soup_client_input_stream_set_property;
219 	object_class->get_property = soup_client_input_stream_get_property;
220 
221 	input_stream_class->read_fn = soup_client_input_stream_read_fn;
222 	input_stream_class->close_fn = soup_client_input_stream_close_fn;
223 	input_stream_class->close_async = soup_client_input_stream_close_async;
224 	input_stream_class->close_finish = soup_client_input_stream_close_finish;
225 
226 	signals[SIGNAL_EOF] =
227 		g_signal_new ("eof",
228 			      G_OBJECT_CLASS_TYPE (object_class),
229 			      G_SIGNAL_RUN_LAST,
230 			      0,
231 			      NULL, NULL,
232 			      NULL,
233 			      G_TYPE_NONE, 0);
234 
235 	g_object_class_install_property (
236 		object_class, PROP_MESSAGE,
237 		g_param_spec_object ("message",
238 				     "Message",
239 				     "Message",
240 				     SOUP_TYPE_MESSAGE,
241 				     G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY |
242 				     G_PARAM_STATIC_STRINGS));
243 }
244 
245 static void
soup_client_input_stream_pollable_init(GPollableInputStreamInterface * pollable_interface,gpointer interface_data)246 soup_client_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface,
247 					gpointer interface_data)
248 {
249 	soup_client_input_stream_parent_pollable_interface =
250 		g_type_interface_peek_parent (pollable_interface);
251 
252 	pollable_interface->read_nonblocking = soup_client_input_stream_read_nonblocking;
253 }
254 
255 GInputStream *
soup_client_input_stream_new(GInputStream * base_stream,SoupMessage * msg)256 soup_client_input_stream_new (GInputStream *base_stream,
257 			      SoupMessage  *msg)
258 {
259 	return g_object_new (SOUP_TYPE_CLIENT_INPUT_STREAM,
260 			     "base-stream", base_stream,
261 			     "message", msg,
262 			     NULL);
263 }
264