1 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
2 /*
3  * soup-multipart-input-stream.c
4  *
5  * Copyright (C) 2012 Collabora Ltd.
6  */
7 
8 #ifdef HAVE_CONFIG_H
9 #include <config.h>
10 #endif
11 
12 #include <string.h>
13 
14 #include "soup-body-input-stream.h"
15 #include "soup-filter-input-stream.h"
16 #include "soup-enum-types.h"
17 #include "soup-message.h"
18 #include "soup-message-private.h"
19 #include "soup-multipart-input-stream.h"
20 
21 #define RESPONSE_BLOCK_SIZE 8192
22 
23 /**
24  * SECTION:soup-multipart-input-stream
25  * @short_description: Multipart input handling stream
26  *
27  * This adds support for the multipart responses. For handling the
28  * multiple parts the user needs to wrap the #GInputStream obtained by
29  * sending the request with a #SoupMultipartInputStream and use
30  * soup_multipart_input_stream_next_part() before reading. Responses
31  * which are not wrapped will be treated like non-multipart responses.
32  *
33  * Note that although #SoupMultipartInputStream is a #GInputStream,
34  * you should not read directly from it, and the results are undefined
35  * if you do.
36  *
37  * Since: 2.40
38  **/
39 
40 enum {
41 	PROP_0,
42 
43 	PROP_MESSAGE,
44 };
45 
46 struct _SoupMultipartInputStreamPrivate {
47 	SoupMessage	        *msg;
48 
49 	gboolean	         done_with_part;
50 
51 	GByteArray	        *meta_buf;
52 	SoupMessageHeaders      *current_headers;
53 
54 	SoupFilterInputStream   *base_stream;
55 
56 	char		        *boundary;
57 	gsize		         boundary_size;
58 
59 	goffset		        remaining_bytes;
60 };
61 
62 static void soup_multipart_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data);
63 
G_DEFINE_TYPE_WITH_CODE(SoupMultipartInputStream,soup_multipart_input_stream,G_TYPE_FILTER_INPUT_STREAM,G_ADD_PRIVATE (SoupMultipartInputStream)G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,soup_multipart_input_stream_pollable_init))64 G_DEFINE_TYPE_WITH_CODE (SoupMultipartInputStream, soup_multipart_input_stream, G_TYPE_FILTER_INPUT_STREAM,
65                          G_ADD_PRIVATE (SoupMultipartInputStream)
66 			 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
67 						soup_multipart_input_stream_pollable_init))
68 
69 static void
70 soup_multipart_input_stream_dispose (GObject *object)
71 {
72 	SoupMultipartInputStream *multipart = SOUP_MULTIPART_INPUT_STREAM (object);
73 
74 	g_clear_object (&multipart->priv->msg);
75 	g_clear_object (&multipart->priv->base_stream);
76 
77 	G_OBJECT_CLASS (soup_multipart_input_stream_parent_class)->dispose (object);
78 }
79 
80 static void
soup_multipart_input_stream_finalize(GObject * object)81 soup_multipart_input_stream_finalize (GObject *object)
82 {
83 	SoupMultipartInputStream *multipart = SOUP_MULTIPART_INPUT_STREAM (object);
84 
85 	g_free (multipart->priv->boundary);
86 
87 	if (multipart->priv->meta_buf)
88 		g_clear_pointer (&multipart->priv->meta_buf, g_byte_array_unref);
89 
90 	G_OBJECT_CLASS (soup_multipart_input_stream_parent_class)->finalize (object);
91 }
92 
93 static void
soup_multipart_input_stream_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)94 soup_multipart_input_stream_set_property (GObject *object, guint prop_id,
95 					  const GValue *value, GParamSpec *pspec)
96 {
97 	SoupMultipartInputStream *multipart = SOUP_MULTIPART_INPUT_STREAM (object);
98 
99 	switch (prop_id) {
100 	case PROP_MESSAGE:
101 		multipart->priv->msg = g_value_dup_object (value);
102 		break;
103 	default:
104 		G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
105 		break;
106 	}
107 }
108 
109 static void
soup_multipart_input_stream_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)110 soup_multipart_input_stream_get_property (GObject *object, guint prop_id,
111 					  GValue *value, GParamSpec *pspec)
112 {
113 	SoupMultipartInputStream *multipart = SOUP_MULTIPART_INPUT_STREAM (object);
114 
115 	switch (prop_id) {
116 	case PROP_MESSAGE:
117 		g_value_set_object (value, multipart->priv->msg);
118 		break;
119 	default:
120 		G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
121 		break;
122 	}
123 }
124 
125 static gssize
soup_multipart_input_stream_read_real(GInputStream * stream,void * buffer,gsize count,gboolean blocking,GCancellable * cancellable,GError ** error)126 soup_multipart_input_stream_read_real (GInputStream	*stream,
127 				       void		*buffer,
128 				       gsize		 count,
129 				       gboolean          blocking,
130 				       GCancellable	*cancellable,
131 				       GError          **error)
132 {
133 	SoupMultipartInputStream *multipart = SOUP_MULTIPART_INPUT_STREAM (stream);
134 	SoupMultipartInputStreamPrivate *priv = multipart->priv;
135 	gboolean got_boundary = FALSE;
136 	gssize nread = 0;
137 	guint8 *buf;
138 
139 	g_return_val_if_fail (priv->boundary != NULL, -1);
140 
141 	/* If we have received a Content-Length, and are not yet close to the end of
142 	 * the part, let's not look for the boundary for now. This optimization is
143 	 * necessary for keeping CPU usage civil.
144 	 */
145 	if (priv->remaining_bytes > priv->boundary_size) {
146 		goffset bytes_to_read = MIN (count, priv->remaining_bytes - priv->boundary_size);
147 
148 		nread = g_pollable_stream_read (G_INPUT_STREAM (priv->base_stream),
149 						buffer, bytes_to_read, blocking,
150 						cancellable, error);
151 
152 		if (nread > 0)
153 			priv->remaining_bytes -= nread;
154 
155 		return nread;
156 	}
157 
158 	if (priv->done_with_part)
159 		return 0;
160 
161 	nread = soup_filter_input_stream_read_until (priv->base_stream, buffer, count,
162 						     priv->boundary, priv->boundary_size,
163 						     blocking, FALSE, &got_boundary,
164 						     cancellable, error);
165 
166 	if (nread <= 0)
167 		return nread;
168 
169 	if (!got_boundary)
170 		return nread;
171 
172 	priv->done_with_part = TRUE;
173 
174 	/* Ignore the newline that preceded the boundary. */
175 	if (nread == 1) {
176 		buf = ((guint8*)buffer);
177 		if (!memcmp (buf, "\n", 1))
178 			nread -= 1;
179 	} else {
180 		buf = ((guint8*)buffer) + nread - 2;
181 		if (!memcmp (buf, "\r\n", 2))
182 			nread -= 2;
183 		else if (!memcmp (buf, "\n", 1))
184 			nread -= 1;
185 	}
186 
187 	return nread;
188 }
189 
190 static gssize
soup_multipart_input_stream_read(GInputStream * stream,void * buffer,gsize count,GCancellable * cancellable,GError ** error)191 soup_multipart_input_stream_read (GInputStream	*stream,
192 				  void		*buffer,
193 				  gsize		 count,
194 				  GCancellable	*cancellable,
195 				  GError       **error)
196 {
197 	return soup_multipart_input_stream_read_real (stream, buffer, count,
198 						      TRUE, cancellable, error);
199 }
200 
201 static void
soup_multipart_input_stream_init(SoupMultipartInputStream * multipart)202 soup_multipart_input_stream_init (SoupMultipartInputStream *multipart)
203 {
204 	SoupMultipartInputStreamPrivate *priv;
205 	priv = multipart->priv = soup_multipart_input_stream_get_instance_private (multipart);
206 
207 	priv->meta_buf = g_byte_array_sized_new (RESPONSE_BLOCK_SIZE);
208 	priv->done_with_part = FALSE;
209 }
210 
211 static void
soup_multipart_input_stream_constructed(GObject * object)212 soup_multipart_input_stream_constructed (GObject *object)
213 {
214 	SoupMultipartInputStream *multipart;
215 	SoupMultipartInputStreamPrivate *priv;
216 	GInputStream *base_stream;
217 	const char* boundary;
218 	GHashTable *params = NULL;
219 
220 	multipart = SOUP_MULTIPART_INPUT_STREAM (object);
221 	priv = multipart->priv;
222 
223 	base_stream = G_FILTER_INPUT_STREAM (multipart)->base_stream;
224 	priv->base_stream = SOUP_FILTER_INPUT_STREAM (soup_filter_input_stream_new (base_stream));
225 
226 	soup_message_headers_get_content_type (priv->msg->response_headers,
227 					       &params);
228 
229 	boundary = g_hash_table_lookup (params, "boundary");
230 	if (boundary) {
231 		if (g_str_has_prefix (boundary, "--"))
232 			priv->boundary = g_strdup (boundary);
233 		else
234 			priv->boundary = g_strdup_printf ("--%s", boundary);
235 
236 		priv->boundary_size = strlen (priv->boundary);
237 	} else {
238 		g_warning ("No boundary found in message tagged as multipart.");
239 	}
240 
241 	g_hash_table_destroy (params);
242 
243 	if (G_OBJECT_CLASS (soup_multipart_input_stream_parent_class)->constructed)
244 		G_OBJECT_CLASS (soup_multipart_input_stream_parent_class)->constructed (object);
245 }
246 
247 static gboolean
soup_multipart_input_stream_is_readable(GPollableInputStream * stream)248 soup_multipart_input_stream_is_readable (GPollableInputStream *stream)
249 {
250 	SoupMultipartInputStream *multipart = SOUP_MULTIPART_INPUT_STREAM (stream);
251 	SoupMultipartInputStreamPrivate *priv = multipart->priv;
252 
253 	return g_pollable_input_stream_is_readable (G_POLLABLE_INPUT_STREAM (priv->base_stream));
254 }
255 
256 static gssize
soup_multipart_input_stream_read_nonblocking(GPollableInputStream * stream,void * buffer,gsize count,GError ** error)257 soup_multipart_input_stream_read_nonblocking (GPollableInputStream  *stream,
258 					      void                  *buffer,
259 					      gsize                  count,
260 					      GError               **error)
261 {
262 	SoupMultipartInputStream *multipart = SOUP_MULTIPART_INPUT_STREAM (stream);
263 
264 	return soup_multipart_input_stream_read_real (G_INPUT_STREAM (multipart),
265 						      buffer, count,
266 						      FALSE, NULL, error);
267 }
268 
269 static GSource *
soup_multipart_input_stream_create_source(GPollableInputStream * stream,GCancellable * cancellable)270 soup_multipart_input_stream_create_source (GPollableInputStream *stream,
271 					   GCancellable         *cancellable)
272 {
273 	SoupMultipartInputStream *multipart = SOUP_MULTIPART_INPUT_STREAM (stream);
274 	SoupMultipartInputStreamPrivate *priv = multipart->priv;
275 	GSource *base_source, *pollable_source;
276 
277 	base_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (priv->base_stream), cancellable);
278 
279 	pollable_source = g_pollable_source_new_full (stream, base_source, cancellable);
280 	g_source_unref (base_source);
281 
282 	return pollable_source;
283 }
284 
285 static void
soup_multipart_input_stream_class_init(SoupMultipartInputStreamClass * multipart_class)286 soup_multipart_input_stream_class_init (SoupMultipartInputStreamClass *multipart_class)
287 {
288 	GObjectClass *object_class = G_OBJECT_CLASS (multipart_class);
289 	GInputStreamClass *input_stream_class =
290 		G_INPUT_STREAM_CLASS (multipart_class);
291 
292 	object_class->dispose = soup_multipart_input_stream_dispose;
293 	object_class->finalize = soup_multipart_input_stream_finalize;
294 	object_class->constructed = soup_multipart_input_stream_constructed;
295 	object_class->set_property = soup_multipart_input_stream_set_property;
296 	object_class->get_property = soup_multipart_input_stream_get_property;
297 
298 	input_stream_class->read_fn = soup_multipart_input_stream_read;
299 
300 	g_object_class_install_property (
301 		object_class, PROP_MESSAGE,
302 		g_param_spec_object ("message",
303 				     "Message",
304 				     "The SoupMessage",
305 				     SOUP_TYPE_MESSAGE,
306 				     G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY |
307 				     G_PARAM_STATIC_STRINGS));
308 
309 }
310 
311 static void
soup_multipart_input_stream_pollable_init(GPollableInputStreamInterface * pollable_interface,gpointer interface_data)312 soup_multipart_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface,
313 					   gpointer                       interface_data)
314 {
315 	pollable_interface->is_readable = soup_multipart_input_stream_is_readable;
316 	pollable_interface->read_nonblocking = soup_multipart_input_stream_read_nonblocking;
317 	pollable_interface->create_source = soup_multipart_input_stream_create_source;
318 }
319 
320 static void
soup_multipart_input_stream_parse_headers(SoupMultipartInputStream * multipart)321 soup_multipart_input_stream_parse_headers (SoupMultipartInputStream *multipart)
322 {
323 	SoupMultipartInputStreamPrivate *priv = multipart->priv;
324 	gboolean success;
325 
326 	priv->current_headers = soup_message_headers_new (SOUP_MESSAGE_HEADERS_MULTIPART);
327 
328 	/* The part lacks headers, but is there. */
329 	if (!priv->meta_buf->len)
330 		return;
331 
332 	success = soup_headers_parse ((const char*) priv->meta_buf->data,
333 				      (int) priv->meta_buf->len,
334 				      priv->current_headers);
335 
336 	if (success)
337 		priv->remaining_bytes = soup_message_headers_get_content_length (priv->current_headers);
338 	else
339 		g_clear_pointer (&priv->current_headers, soup_message_headers_free);
340 
341 	g_byte_array_remove_range (priv->meta_buf, 0, priv->meta_buf->len);
342 }
343 
344 static gboolean
soup_multipart_input_stream_read_headers(SoupMultipartInputStream * multipart,GCancellable * cancellable,GError ** error)345 soup_multipart_input_stream_read_headers (SoupMultipartInputStream  *multipart,
346 					  GCancellable		    *cancellable,
347 					  GError		   **error)
348 {
349 	SoupMultipartInputStreamPrivate *priv = multipart->priv;
350 	guchar read_buf[RESPONSE_BLOCK_SIZE];
351 	guchar *buf;
352 	gboolean got_boundary = FALSE;
353 	gboolean got_lf = FALSE;
354 	gssize nread = 0;
355 
356 	g_return_val_if_fail (priv->boundary != NULL, TRUE);
357 
358 	g_clear_pointer (&priv->current_headers, soup_message_headers_free);
359 
360 	while (1) {
361 		nread = soup_filter_input_stream_read_line (priv->base_stream, read_buf, sizeof (read_buf),
362 							    /* blocking */ TRUE, &got_lf, cancellable, error);
363 
364 		if (nread <= 0)
365 			break;
366 
367 		g_byte_array_append (priv->meta_buf, read_buf, nread);
368 
369 		/* Need to do this boundary check before checking for the line feed, since we
370 		 * may get the multipart end indicator without getting a new line.
371 		 */
372 		if (!got_boundary &&
373 		    !strncmp ((char *)priv->meta_buf->data,
374 			      priv->boundary,
375 			      priv->boundary_size)) {
376 			got_boundary = TRUE;
377 
378 			/* Now check for possible multipart termination. */
379 			buf = &read_buf[nread - 4];
380 			if ((nread >= 4 && !memcmp (buf, "--\r\n", 4)) ||
381 			    (nread >= 3 && !memcmp (buf + 1, "--\n", 3)) ||
382 			    (nread >= 3 && !memcmp (buf + 2, "--", 2))) {
383 				g_byte_array_set_size (priv->meta_buf, 0);
384 				return FALSE;
385 			}
386 		}
387 
388 		g_return_val_if_fail (got_lf, FALSE);
389 
390 		/* Discard pre-boundary lines. */
391 		if (!got_boundary) {
392 			g_byte_array_set_size (priv->meta_buf, 0);
393 			continue;
394 		}
395 
396 		if (nread == 1 &&
397 		    priv->meta_buf->len >= 2 &&
398 		    !strncmp ((char *)priv->meta_buf->data +
399 			      priv->meta_buf->len - 2,
400 			      "\n\n", 2))
401 			break;
402 		else if (nread == 2 &&
403 			 priv->meta_buf->len >= 3 &&
404 			 !strncmp ((char *)priv->meta_buf->data +
405 				   priv->meta_buf->len - 3,
406 				   "\n\r\n", 3))
407 			break;
408 	}
409 
410 	return TRUE;
411 }
412 
413 /* Public APIs */
414 
415 /**
416  * soup_multipart_input_stream_new:
417  * @msg: the #SoupMessage the response is related to.
418  * @base_stream: the #GInputStream returned by sending the request.
419  *
420  * Creates a new #SoupMultipartInputStream that wraps the
421  * #GInputStream obtained by sending the #SoupRequest. Reads should
422  * not be done directly through this object, use the input streams
423  * returned by soup_multipart_input_stream_next_part() or its async
424  * counterpart instead.
425  *
426  * Returns: a new #SoupMultipartInputStream
427  *
428  * Since: 2.40
429  **/
430 SoupMultipartInputStream *
soup_multipart_input_stream_new(SoupMessage * msg,GInputStream * base_stream)431 soup_multipart_input_stream_new (SoupMessage  *msg,
432 				 GInputStream *base_stream)
433 {
434 	return g_object_new (SOUP_TYPE_MULTIPART_INPUT_STREAM,
435 			     "message", msg,
436 			     "base-stream", base_stream,
437 			     NULL);
438 }
439 
440 /**
441  * soup_multipart_input_stream_next_part:
442  * @multipart: the #SoupMultipartInputStream
443  * @cancellable: a #GCancellable
444  * @error: a #GError
445  *
446  * Obtains an input stream for the next part. When dealing with a
447  * multipart response the input stream needs to be wrapped in a
448  * #SoupMultipartInputStream and this function or its async
449  * counterpart need to be called to obtain the first part for
450  * reading.
451  *
452  * After calling this function,
453  * soup_multipart_input_stream_get_headers() can be used to obtain the
454  * headers for the first part. A read of 0 bytes indicates the end of
455  * the part; a new call to this function should be done at that point,
456  * to obtain the next part.
457  *
458  * Return value: (nullable) (transfer full): a new #GInputStream, or
459  * %NULL if there are no more parts
460  *
461  * Since: 2.40
462  */
463 GInputStream *
soup_multipart_input_stream_next_part(SoupMultipartInputStream * multipart,GCancellable * cancellable,GError ** error)464 soup_multipart_input_stream_next_part (SoupMultipartInputStream  *multipart,
465 				       GCancellable	         *cancellable,
466 				       GError                   **error)
467 {
468 	if (!soup_multipart_input_stream_read_headers (multipart, cancellable, error))
469 		return NULL;
470 
471 	soup_multipart_input_stream_parse_headers (multipart);
472 
473 	multipart->priv->done_with_part = FALSE;
474 
475 	return G_INPUT_STREAM (g_object_new (SOUP_TYPE_BODY_INPUT_STREAM,
476 					     "base-stream", G_INPUT_STREAM (multipart),
477 					     "close-base-stream", FALSE,
478 					     "encoding", SOUP_ENCODING_EOF,
479 					     NULL));
480 
481 }
482 
483 static void
soup_multipart_input_stream_next_part_thread(GTask * task,gpointer object,gpointer task_data,GCancellable * cancellable)484 soup_multipart_input_stream_next_part_thread (GTask        *task,
485 					      gpointer      object,
486 					      gpointer      task_data,
487 					      GCancellable *cancellable)
488 {
489 	SoupMultipartInputStream *multipart = SOUP_MULTIPART_INPUT_STREAM (object);
490 	GError *error = NULL;
491 	GInputStream *new_stream;
492 
493 	new_stream = soup_multipart_input_stream_next_part (multipart, cancellable, &error);
494 
495 	g_input_stream_clear_pending (G_INPUT_STREAM (multipart));
496 
497 	if (error)
498 		g_task_return_error (task, error);
499 	else
500 		g_task_return_pointer (task, new_stream, g_object_unref);
501 }
502 
503 /**
504  * soup_multipart_input_stream_next_part_async:
505  * @multipart: the #SoupMultipartInputStream.
506  * @io_priority: the I/O priority for the request.
507  * @cancellable: a #GCancellable.
508  * @callback: callback to call when request is satisfied.
509  * @data: data for @callback
510  *
511  * Obtains a #GInputStream for the next request. See
512  * soup_multipart_input_stream_next_part() for details on the
513  * workflow.
514  *
515  * Since: 2.40
516  */
517 void
soup_multipart_input_stream_next_part_async(SoupMultipartInputStream * multipart,int io_priority,GCancellable * cancellable,GAsyncReadyCallback callback,gpointer data)518 soup_multipart_input_stream_next_part_async (SoupMultipartInputStream *multipart,
519 					     int                       io_priority,
520 					     GCancellable	      *cancellable,
521 					     GAsyncReadyCallback       callback,
522 					     gpointer		       data)
523 {
524 	GInputStream *stream = G_INPUT_STREAM (multipart);
525 	GTask *task;
526 	GError *error = NULL;
527 
528 	g_return_if_fail (SOUP_IS_MULTIPART_INPUT_STREAM (multipart));
529 
530 	task = g_task_new (multipart, cancellable, callback, data);
531 	g_task_set_priority (task, io_priority);
532 
533 	if (!g_input_stream_set_pending (stream, &error)) {
534 		g_task_return_error (task, error);
535 		g_object_unref (task);
536 		return;
537 	}
538 
539 	g_task_run_in_thread (task, soup_multipart_input_stream_next_part_thread);
540 	g_object_unref (task);
541 }
542 
543 /**
544  * soup_multipart_input_stream_next_part_finish:
545  * @multipart: a #SoupMultipartInputStream.
546  * @result: a #GAsyncResult.
547  * @error: a #GError location to store any error, or %NULL to ignore.
548  *
549  * Finishes an asynchronous request for the next part.
550  *
551  * Return value: (nullable) (transfer full): a newly created
552  * #GInputStream for reading the next part or %NULL if there are no
553  * more parts.
554  *
555  * Since: 2.40
556  */
557 GInputStream *
soup_multipart_input_stream_next_part_finish(SoupMultipartInputStream * multipart,GAsyncResult * result,GError ** error)558 soup_multipart_input_stream_next_part_finish (SoupMultipartInputStream	*multipart,
559 					      GAsyncResult		*result,
560 					      GError		       **error)
561 {
562 	g_return_val_if_fail (g_task_is_valid (result, multipart), FALSE);
563 
564 	return g_task_propagate_pointer (G_TASK (result), error);
565 }
566 
567 /**
568  * soup_multipart_input_stream_get_headers:
569  * @multipart: a #SoupMultipartInputStream.
570  *
571  * Obtains the headers for the part currently being processed. Note
572  * that the #SoupMessageHeaders that are returned are owned by the
573  * #SoupMultipartInputStream and will be replaced when a call is made
574  * to soup_multipart_input_stream_next_part() or its async
575  * counterpart, so if keeping the headers is required, a copy must be
576  * made.
577  *
578  * Note that if a part had no headers at all an empty #SoupMessageHeaders
579  * will be returned.
580  *
581  * Return value: (nullable) (transfer none): a #SoupMessageHeaders
582  * containing the headers for the part currently being processed or
583  * %NULL if the headers failed to parse.
584  *
585  * Since: 2.40
586  */
587 SoupMessageHeaders *
soup_multipart_input_stream_get_headers(SoupMultipartInputStream * multipart)588 soup_multipart_input_stream_get_headers (SoupMultipartInputStream *multipart)
589 {
590 	return multipart->priv->current_headers;
591 }
592