1 /*
2 * GstCurlHttpSrc
3 * Copyright 2017 British Broadcasting Corporation - Research and Development
4 *
5 * Author: Sam Hurst <samuelh@rd.bbc.co.uk>
6 *
7 * Permission is hereby granted, free of charge, to any person obtaining a
8 * copy of this software and associated documentation files (the "Software"),
9 * to deal in the Software without restriction, including without limitation
10 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
11 * and/or sell copies of the Software, and to permit persons to whom the
12 * Software is furnished to do so, subject to the following conditions:
13 *
14 * The above copyright notice and this permission notice shall be included in
15 * all copies or substantial portions of the Software.
16 *
17 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23 * DEALINGS IN THE SOFTWARE.
24 *
25 * Alternatively, the contents of this file may be used under the
26 * GNU Lesser General Public License Version 2.1 (the "LGPL"), in
27 * which case the following provisions apply instead of the ones
28 * mentioned above:
29 *
30 * This library is free software; you can redistribute it and/or
31 * modify it under the terms of the GNU Library General Public
32 * License as published by the Free Software Foundation; either
33 * version 2 of the License, or (at your option) any later version.
34 *
35 * This library is distributed in the hope that it will be useful,
36 * but WITHOUT ANY WARRANTY; without even the implied warranty of
37 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
38 * Library General Public License for more details.
39 *
40 * You should have received a copy of the GNU Library General Public
41 * License along with this library; if not, write to the
42 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
43 * Boston, MA 02111-1307, USA.
44 */
45
46 /**
47 * SECTION:element-curlhttpsrc
48 *
49 * This plugin reads data from a remote location specified by a URI, when the
50 * protocol is 'http' or 'https'.
51 *
52 * It is based on the cURL project (http://curl.haxx.se/) and is specifically
53 * designed to be also used with nghttp2 (http://nghttp2.org) to enable HTTP/2
54 * support for GStreamer. Your libcurl library MUST be compiled against nghttp2
55 * for HTTP/2 support for this functionality. HTTPS support is dependent on
56 * cURL being built with SSL support (OpenSSL/PolarSSL/NSS/GnuTLS).
57 *
58 * An HTTP proxy must be specified by URL.
59 * If the "http_proxy" environment variable is set, its value is used.
60 * The #GstCurlHttpSrc:proxy property can be used to override the default.
61 *
62 * <refsect2>
63 * <title>Example launch line</title>
64 * |[
65 * gst-launch-1.0 curlhttpsrc location=http://127.0.1.1/index.html ! fakesink dump=1
66 * ]| The above pipeline reads a web page from the local machine using HTTP and
67 * dumps it to stdout.
68 * |[
69 * gst-launch-1.0 playbin uri=http://rdmedia.bbc.co.uk/dash/testmpds/multiperiod/bbb.php
70 * ]| The above pipeline will start up a DASH streaming session from the given
71 * MPD file. This requires GStreamer to have been built with dashdemux from
72 * gst-plugins-bad.
73 * </refsect2>
74 */
75
76 /*
77 * Thread safety notes.
78 *
79 * GstCurlHttpSrc uses a single thread running the
80 * gst_curl_http_src_curl_multi_loop() function to handle receiving
81 * data and messages from libcurl. Each instance of GstCurlHttpSrc adds
82 * an entry into a queue in GstCurlHttpSrcMultiTaskContext and waits
83 * for the multi_loop to perform the HTTP request.
84 *
85 * When an instance of GstCurlHttpSrc wants to make a request (i.e.
86 * it has moved to the PLAYING state) it adds itself to the
87 * multi_task_context.queue list and signals the multi_loop task.
88 *
89 * Each instance of GstCurlHttpSrc uses buffer_mutex and buffer_cond
90 * to wait for gst_curl_http_src_curl_multi_loop() to perform the
91 * request and signal completion.
92 *
93 * Each instance of GstCurlHttpSrc is protected by the mutexes:
94 * 1. uri_mutex
95 * 2. buffer_mutex
96 *
97 * uri_mutex is used to protect access to the uri field.
98 *
99 * buffer_mutex is used to protect access to buffer_cond, state and
100 * connection_status.
101 *
102 * The gst_curl_http_src_curl_multi_loop() function uses the mutexes:
103 * 1. multi_task_context.task_rec_mutex
104 * 2. multi_task_context.mutex
105 *
106 * multi_task_context.task_rec_mutex is only used by GstTask.
107 *
108 * multi_task_context.mutex is used to protect access to queue and state
109 *
110 * To avoid deadlock, it is vital that if both multi_task_context.mutex
111 * and buffer_mutex are required, that they are locked in the order:
112 * 1. multi_task_context.mutex
113 * 2. buffer_mutex
114 */
115
116 #ifdef HAVE_CONFIG_H
117 #include <config.h>
118 #endif
119
120 #include <gst/gst-i18n-plugin.h>
121
122 #include "gstcurlhttpsrc.h"
123 #include "gstcurlqueue.h"
124 #include "gstcurldefaults.h"
125
126 GST_DEBUG_CATEGORY_STATIC (gst_curl_http_src_debug);
127 #define GST_CAT_DEFAULT gst_curl_http_src_debug
128 GST_DEBUG_CATEGORY_STATIC (gst_curl_loop_debug);
129
130 #define CURL_HTTP_SRC_ERROR(src,cat,code,error_message) \
131 do { \
132 GST_ELEMENT_ERROR_WITH_DETAILS ((src), cat, code, ("%s", error_message), \
133 ("%s (%d), URL: %s, Redirect to: %s", (src)->reason_phrase, \
134 (src)->status_code, (src)->uri, GST_STR_NULL ((src)->redirect_uri)), \
135 ("http-status-code", G_TYPE_UINT, (src)->status_code, \
136 "http-redirect-uri", G_TYPE_STRING, GST_STR_NULL ((src)->redirect_uri), NULL)); \
137 } while(0)
138
139 enum
140 {
141 PROP_0,
142 PROP_URI,
143 PROP_USERNAME,
144 PROP_PASSWORD,
145 PROP_PROXYURI,
146 PROP_PROXYUSERNAME,
147 PROP_PROXYPASSWORD,
148 PROP_COOKIES,
149 PROP_USERAGENT,
150 PROP_HEADERS,
151 PROP_COMPRESS,
152 PROP_REDIRECT,
153 PROP_MAXREDIRECT,
154 PROP_KEEPALIVE,
155 PROP_TIMEOUT,
156 PROP_STRICT_SSL,
157 PROP_SSL_CA_FILE,
158 PROP_RETRIES,
159 PROP_CONNECTIONMAXTIME,
160 PROP_MAXCONCURRENT_SERVER,
161 PROP_MAXCONCURRENT_PROXY,
162 PROP_MAXCONCURRENT_GLOBAL,
163 PROP_HTTPVERSION,
164 PROP_IRADIO_MODE,
165 PROP_MAX
166 };
167
168 /*
169 * Make a source pad template to be able to kick out recv'd data
170 */
171 static GstStaticPadTemplate srcpadtemplate = GST_STATIC_PAD_TEMPLATE ("src",
172 GST_PAD_SRC,
173 GST_PAD_ALWAYS,
174 GST_STATIC_CAPS_ANY);
175
176 /*
177 * Function Definitions
178 */
179 /* Gstreamer generic element functions */
180 static void gst_curl_http_src_set_property (GObject * object, guint prop_id,
181 const GValue * value, GParamSpec * pspec);
182 static void gst_curl_http_src_get_property (GObject * object, guint prop_id,
183 GValue * value, GParamSpec * pspec);
184 static void gst_curl_http_src_ref_multi (GstCurlHttpSrc * src);
185 static void gst_curl_http_src_unref_multi (GstCurlHttpSrc * src);
186 static void gst_curl_http_src_finalize (GObject * obj);
187 static GstFlowReturn gst_curl_http_src_create (GstPushSrc * psrc,
188 GstBuffer ** outbuf);
189 static GstFlowReturn gst_curl_http_src_handle_response (GstCurlHttpSrc * src);
190 static gboolean gst_curl_http_src_negotiate_caps (GstCurlHttpSrc * src);
191 static GstStateChangeReturn gst_curl_http_src_change_state (GstElement *
192 element, GstStateChange transition);
193 static void gst_curl_http_src_cleanup_instance (GstCurlHttpSrc * src);
194 static gboolean gst_curl_http_src_query (GstBaseSrc * bsrc, GstQuery * query);
195 static gboolean gst_curl_http_src_get_content_length (GstBaseSrc * bsrc,
196 guint64 * size);
197 static gboolean gst_curl_http_src_unlock (GstBaseSrc * bsrc);
198 static gboolean gst_curl_http_src_unlock_stop (GstBaseSrc * bsrc);
199
200 /* URI Handler functions */
201 static void gst_curl_http_src_uri_handler_init (gpointer g_iface,
202 gpointer iface_data);
203 static guint gst_curl_http_src_urihandler_get_type (GType type);
204 static const gchar *const *gst_curl_http_src_urihandler_get_protocols (GType
205 type);
206 static gchar *gst_curl_http_src_urihandler_get_uri (GstURIHandler * handler);
207 static gboolean gst_curl_http_src_urihandler_set_uri (GstURIHandler * handler,
208 const gchar * uri, GError ** error);
209
210 /* GstTask functions */
211 static void gst_curl_http_src_curl_multi_loop (gpointer thread_data);
212 static CURL *gst_curl_http_src_create_easy_handle (GstCurlHttpSrc * s);
213 static inline void gst_curl_http_src_destroy_easy_handle (GstCurlHttpSrc * src);
214 static size_t gst_curl_http_src_get_header (void *header, size_t size,
215 size_t nmemb, void *src);
216 static size_t gst_curl_http_src_get_chunks (void *chunk, size_t size,
217 size_t nmemb, void *src);
218 static void gst_curl_http_src_request_remove (GstCurlHttpSrc * src);
219 static void gst_curl_http_src_wait_until_removed (GstCurlHttpSrc * src);
220 static char *gst_curl_http_src_strcasestr (const char *haystack,
221 const char *needle);
222
223 static curl_version_info_data *gst_curl_http_src_curl_capabilities = NULL;
224 static GstCurlHttpVersion pref_http_ver;
225
226 #define GST_TYPE_CURL_HTTP_VERSION (gst_curl_http_version_get_type ())
227
228 static GType
gst_curl_http_version_get_type(void)229 gst_curl_http_version_get_type (void)
230 {
231 static GType gtype = 0;
232
233 if (!gtype) {
234 static const GEnumValue http_versions[] = {
235 {GSTCURL_HTTP_VERSION_1_0, "HTTP Version 1.0", "1.0"},
236 {GSTCURL_HTTP_VERSION_1_1, "HTTP Version 1.1", "1.1"},
237 #ifdef CURL_VERSION_HTTP2
238 {GSTCURL_HTTP_VERSION_2_0, "HTTP Version 2.0", "2.0"},
239 #endif
240 {0, NULL, NULL}
241 };
242 gtype = g_enum_register_static ("GstCurlHttpVersionType", http_versions);
243 }
244 return gtype;
245 }
246
247 #define gst_curl_http_src_parent_class parent_class
248 G_DEFINE_TYPE_WITH_CODE (GstCurlHttpSrc, gst_curl_http_src, GST_TYPE_PUSH_SRC,
249 G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER,
250 gst_curl_http_src_uri_handler_init));
251
252 static void
gst_curl_http_src_class_init(GstCurlHttpSrcClass * klass)253 gst_curl_http_src_class_init (GstCurlHttpSrcClass * klass)
254 {
255 GObjectClass *gobject_class;
256 GstElementClass *gstelement_class;
257 GstBaseSrcClass *gstbasesrc_class;
258 GstPushSrcClass *gstpushsrc_class;
259 const gchar *http_env;
260 GstCurlHttpVersion default_http_version;
261
262 gobject_class = (GObjectClass *) klass;
263 gstelement_class = (GstElementClass *) klass;
264 gstbasesrc_class = (GstBaseSrcClass *) klass;
265 gstpushsrc_class = (GstPushSrcClass *) klass;
266
267 GST_DEBUG_CATEGORY_INIT (gst_curl_http_src_debug, "curlhttpsrc",
268 0, "UriHandler for libcURL");
269
270 GST_INFO_OBJECT (klass, "class_init started!");
271
272 gstelement_class->change_state =
273 GST_DEBUG_FUNCPTR (gst_curl_http_src_change_state);
274 gstpushsrc_class->create = GST_DEBUG_FUNCPTR (gst_curl_http_src_create);
275 gstbasesrc_class->query = GST_DEBUG_FUNCPTR (gst_curl_http_src_query);
276 gstbasesrc_class->get_size =
277 GST_DEBUG_FUNCPTR (gst_curl_http_src_get_content_length);
278 gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_curl_http_src_unlock);
279 gstbasesrc_class->unlock_stop =
280 GST_DEBUG_FUNCPTR (gst_curl_http_src_unlock_stop);
281
282 gst_element_class_add_pad_template (gstelement_class,
283 gst_static_pad_template_get (&srcpadtemplate));
284
285 gst_curl_http_src_curl_capabilities = curl_version_info (CURLVERSION_NOW);
286 #ifdef CURL_VERSION_HTTP2
287 if (gst_curl_http_src_curl_capabilities->features & CURL_VERSION_HTTP2) {
288 default_http_version = GSTCURL_HTTP_VERSION_2_0;
289 } else
290 #endif
291 default_http_version = GSTCURL_HTTP_VERSION_1_1;
292
293 http_env = g_getenv ("GST_CURL_HTTP_VER");
294 if (http_env != NULL) {
295 GST_INFO_OBJECT (klass, "Seen env var GST_CURL_HTTP_VER with value %s",
296 http_env);
297 if (!strcmp (http_env, "1.0")) {
298 pref_http_ver = GSTCURL_HTTP_VERSION_1_0;
299 } else if (!strcmp (http_env, "1.1")) {
300 pref_http_ver = GSTCURL_HTTP_VERSION_1_1;
301 } else if (!strcmp (http_env, "2.0")) {
302 #ifdef CURL_VERSION_HTTP2
303 if (gst_curl_http_src_curl_capabilities->features & CURL_VERSION_HTTP2) {
304 pref_http_ver = GSTCURL_HTTP_VERSION_2_0;
305 } else {
306 goto unsupported_http_version;
307 }
308 #endif
309 } else {
310 unsupported_http_version:
311 GST_WARNING_OBJECT (klass,
312 "Unsupported HTTP version: %s. Fallback to default", http_env);
313 pref_http_ver = default_http_version;
314 }
315 } else {
316 pref_http_ver = default_http_version;
317 }
318
319 gobject_class->set_property = gst_curl_http_src_set_property;
320 gobject_class->get_property = gst_curl_http_src_get_property;
321 gobject_class->finalize = gst_curl_http_src_finalize;
322
323 g_object_class_install_property (gobject_class, PROP_URI,
324 g_param_spec_string ("location", "Location", "URI of resource to read",
325 GSTCURL_HANDLE_DEFAULT_CURLOPT_URL,
326 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
327
328 g_object_class_install_property (gobject_class, PROP_USERNAME,
329 g_param_spec_string ("user-id", "user-id",
330 "HTTP location URI user id for authentication",
331 GSTCURL_HANDLE_DEFAULT_CURLOPT_USERNAME,
332 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
333
334 g_object_class_install_property (gobject_class, PROP_PASSWORD,
335 g_param_spec_string ("user-pw", "user-pw",
336 "HTTP location URI password for authentication",
337 GSTCURL_HANDLE_DEFAULT_CURLOPT_PASSWORD,
338 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
339
340 g_object_class_install_property (gobject_class, PROP_PROXYURI,
341 g_param_spec_string ("proxy", "Proxy", "URI of HTTP proxy server",
342 GSTCURL_HANDLE_DEFAULT_CURLOPT_PROXY,
343 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
344
345 g_object_class_install_property (gobject_class, PROP_PROXYUSERNAME,
346 g_param_spec_string ("proxy-id", "proxy-id",
347 "HTTP proxy URI user id for authentication",
348 GSTCURL_HANDLE_DEFAULT_CURLOPT_PROXYUSERNAME,
349 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
350
351 g_object_class_install_property (gobject_class, PROP_PROXYPASSWORD,
352 g_param_spec_string ("proxy-pw", "proxy-pw",
353 "HTTP proxy URI password for authentication",
354 GSTCURL_HANDLE_DEFAULT_CURLOPT_PROXYPASSWORD,
355 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
356
357 g_object_class_install_property (gobject_class, PROP_COOKIES,
358 g_param_spec_boxed ("cookies", "Cookies", "List of HTTP Cookies",
359 G_TYPE_STRV, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
360
361 g_object_class_install_property (gobject_class, PROP_USERAGENT,
362 g_param_spec_string ("user-agent", "User-Agent",
363 "URI of resource requested",
364 GSTCURL_HANDLE_DEFAULT_CURLOPT_USERAGENT "/<curl-version>",
365 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
366
367 g_object_class_install_property (gobject_class, PROP_COMPRESS,
368 g_param_spec_boolean ("compress", "Compress",
369 "Allow compressed content encodings",
370 GSTCURL_HANDLE_DEFAULT_CURLOPT_ACCEPT_ENCODING,
371 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
372
373 g_object_class_install_property (gobject_class, PROP_REDIRECT,
374 g_param_spec_boolean ("automatic-redirect", "automatic-redirect",
375 "Allow HTTP Redirections (HTTP Status Code 300 series)",
376 GSTCURL_HANDLE_DEFAULT_CURLOPT_FOLLOWLOCATION,
377 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
378
379 g_object_class_install_property (gobject_class, PROP_MAXREDIRECT,
380 g_param_spec_int ("max-redirect", "Max-Redirect",
381 "Maximum number of permitted redirections. -1 is unlimited.",
382 GSTCURL_HANDLE_MIN_CURLOPT_MAXREDIRS,
383 GSTCURL_HANDLE_MAX_CURLOPT_MAXREDIRS,
384 GSTCURL_HANDLE_DEFAULT_CURLOPT_MAXREDIRS,
385 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
386
387 g_object_class_install_property (gobject_class, PROP_KEEPALIVE,
388 g_param_spec_boolean ("keep-alive", "Keep-Alive",
389 "Toggle keep-alive for connection reuse.",
390 GSTCURL_HANDLE_DEFAULT_CURLOPT_TCP_KEEPALIVE,
391 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
392
393 g_object_class_install_property (gobject_class, PROP_TIMEOUT,
394 g_param_spec_int ("timeout", "Timeout",
395 "Value in seconds before timeout a blocking request (0 = no timeout)",
396 GSTCURL_HANDLE_MIN_CURLOPT_TIMEOUT,
397 GSTCURL_HANDLE_MAX_CURLOPT_TIMEOUT,
398 GSTCURL_HANDLE_DEFAULT_CURLOPT_TIMEOUT,
399 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
400
401 g_object_class_install_property (gobject_class, PROP_HEADERS,
402 g_param_spec_boxed ("extra-headers", "Extra Headers",
403 "Extra headers to append to the HTTP request",
404 GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
405
406 g_object_class_install_property (gobject_class, PROP_STRICT_SSL,
407 g_param_spec_boolean ("ssl-strict", "SSL Strict",
408 "Strict SSL certificate checking",
409 GSTCURL_HANDLE_DEFAULT_CURLOPT_SSL_VERIFYPEER,
410 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
411
412 g_object_class_install_property (gobject_class, PROP_SSL_CA_FILE,
413 g_param_spec_string ("ssl-ca-file", "SSL CA File",
414 "Location of an SSL CA file to use for checking SSL certificates",
415 GSTCURL_HANDLE_DEFAULT_CURLOPT_CAINFO,
416 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
417
418 g_object_class_install_property (gobject_class, PROP_RETRIES,
419 g_param_spec_int ("retries", "Retries",
420 "Maximum number of retries until giving up (-1=infinite)",
421 GSTCURL_HANDLE_MIN_RETRIES, GSTCURL_HANDLE_MAX_RETRIES,
422 GSTCURL_HANDLE_DEFAULT_RETRIES,
423 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
424
425 g_object_class_install_property (gobject_class, PROP_CONNECTIONMAXTIME,
426 g_param_spec_uint ("max-connection-time", "Max-Connection-Time",
427 "Maximum amount of time to keep-alive HTTP connections",
428 GSTCURL_MIN_CONNECTION_TIME, GSTCURL_MAX_CONNECTION_TIME,
429 GSTCURL_DEFAULT_CONNECTION_TIME,
430 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
431
432 g_object_class_install_property (gobject_class, PROP_MAXCONCURRENT_SERVER,
433 g_param_spec_uint ("max-connections-per-server",
434 "Max-Connections-Per-Server",
435 "Maximum number of connections allowed per server for HTTP/1.x",
436 GSTCURL_MIN_CONNECTIONS_SERVER, GSTCURL_MAX_CONNECTIONS_SERVER,
437 GSTCURL_DEFAULT_CONNECTIONS_SERVER,
438 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
439
440 g_object_class_install_property (gobject_class, PROP_MAXCONCURRENT_PROXY,
441 g_param_spec_uint ("max-connections-per-proxy",
442 "Max-Connections-Per-Proxy",
443 "Maximum number of concurrent connections allowed per proxy for HTTP/1.x",
444 GSTCURL_MIN_CONNECTIONS_PROXY, GSTCURL_MAX_CONNECTIONS_PROXY,
445 GSTCURL_DEFAULT_CONNECTIONS_PROXY,
446 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
447
448 g_object_class_install_property (gobject_class, PROP_MAXCONCURRENT_GLOBAL,
449 g_param_spec_uint ("max-connections", "Max-Connections",
450 "Maximum number of concurrent connections allowed for HTTP/1.x",
451 GSTCURL_MIN_CONNECTIONS_GLOBAL, GSTCURL_MAX_CONNECTIONS_GLOBAL,
452 GSTCURL_DEFAULT_CONNECTIONS_GLOBAL,
453 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
454
455 g_object_class_install_property (gobject_class, PROP_HTTPVERSION,
456 g_param_spec_enum ("http-version", "HTTP-Version",
457 "The preferred HTTP protocol version",
458 GST_TYPE_CURL_HTTP_VERSION, pref_http_ver,
459 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
460
461 /* Add a debugging task so it's easier to debug in the Multi worker thread */
462 GST_DEBUG_CATEGORY_INIT (gst_curl_loop_debug, "curl_multi_loop", 0,
463 "libcURL loop thread debugging");
464 #ifndef GST_DISABLE_GST_DEBUG
465 gst_debug_log (gst_curl_loop_debug, GST_LEVEL_INFO, __FILE__, __func__,
466 __LINE__, NULL, "Testing the curl_multi_loop debugging prints");
467 #endif
468
469 klass->multi_task_context.task = NULL;
470 klass->multi_task_context.refcount = 0;
471 klass->multi_task_context.queue = NULL;
472 klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_STOP;
473 klass->multi_task_context.multi_handle = NULL;
474 g_mutex_init (&klass->multi_task_context.mutex);
475 g_cond_init (&klass->multi_task_context.signal);
476
477 gst_element_class_set_static_metadata (gstelement_class,
478 "HTTP Client Source using libcURL",
479 "Source/Network",
480 "Receiver data as a client over a network via HTTP using cURL",
481 "Sam Hurst <samuelh@rd.bbc.co.uk>");
482 }
483
484 static void
gst_curl_http_src_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)485 gst_curl_http_src_set_property (GObject * object, guint prop_id,
486 const GValue * value, GParamSpec * pspec)
487 {
488 GstCurlHttpSrc *source = GST_CURLHTTPSRC (object);
489 GSTCURL_FUNCTION_ENTRY (source);
490
491 switch (prop_id) {
492 case PROP_URI:
493 g_mutex_lock (&source->uri_mutex);
494 g_free (source->uri);
495 source->uri = g_value_dup_string (value);
496 g_mutex_unlock (&source->uri_mutex);
497 break;
498 case PROP_USERNAME:
499 g_free (source->username);
500 source->username = g_value_dup_string (value);
501 break;
502 case PROP_PASSWORD:
503 g_free (source->password);
504 source->password = g_value_dup_string (value);
505 break;
506 case PROP_PROXYURI:
507 g_free (source->proxy_uri);
508 source->proxy_uri = g_value_dup_string (value);
509 break;
510 case PROP_PROXYUSERNAME:
511 g_free (source->proxy_user);
512 source->proxy_user = g_value_dup_string (value);
513 break;
514 case PROP_PROXYPASSWORD:
515 g_free (source->proxy_pass);
516 source->proxy_pass = g_value_dup_string (value);
517 break;
518 case PROP_COOKIES:
519 g_strfreev (source->cookies);
520 source->cookies = g_strdupv (g_value_get_boxed (value));
521 source->number_cookies = g_strv_length (source->cookies);
522 break;
523 case PROP_USERAGENT:
524 g_free (source->user_agent);
525 source->user_agent = g_value_dup_string (value);
526 break;
527 case PROP_HEADERS:
528 {
529 const GstStructure *s = gst_value_get_structure (value);
530 if (source->request_headers)
531 gst_structure_free (source->request_headers);
532 source->request_headers =
533 s ? gst_structure_copy (s) :
534 gst_structure_new_empty (REQUEST_HEADERS_NAME);
535 }
536 break;
537 case PROP_COMPRESS:
538 source->accept_compressed_encodings = g_value_get_boolean (value);
539 break;
540 case PROP_REDIRECT:
541 source->allow_3xx_redirect = g_value_get_boolean (value);
542 break;
543 case PROP_MAXREDIRECT:
544 source->max_3xx_redirects = g_value_get_int (value);
545 break;
546 case PROP_KEEPALIVE:
547 source->keep_alive = g_value_get_boolean (value);
548 break;
549 case PROP_TIMEOUT:
550 source->timeout_secs = g_value_get_int (value);
551 break;
552 case PROP_STRICT_SSL:
553 source->strict_ssl = g_value_get_boolean (value);
554 break;
555 case PROP_SSL_CA_FILE:
556 source->custom_ca_file = g_value_dup_string (value);
557 break;
558 case PROP_RETRIES:
559 source->total_retries = g_value_get_int (value);
560 break;
561 case PROP_CONNECTIONMAXTIME:
562 source->max_connection_time = g_value_get_uint (value);
563 break;
564 case PROP_MAXCONCURRENT_SERVER:
565 source->max_conns_per_server = g_value_get_uint (value);
566 break;
567 case PROP_MAXCONCURRENT_PROXY:
568 source->max_conns_per_proxy = g_value_get_uint (value);
569 break;
570 case PROP_MAXCONCURRENT_GLOBAL:
571 source->max_conns_global = g_value_get_uint (value);
572 break;
573 case PROP_HTTPVERSION:
574 source->preferred_http_version = g_value_get_enum (value);
575 break;
576 default:
577 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
578 break;
579 }
580 GSTCURL_FUNCTION_EXIT (source);
581 }
582
583 static void
gst_curl_http_src_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)584 gst_curl_http_src_get_property (GObject * object, guint prop_id,
585 GValue * value, GParamSpec * pspec)
586 {
587 GstCurlHttpSrc *source = GST_CURLHTTPSRC (object);
588 GSTCURL_FUNCTION_ENTRY (source);
589
590 switch (prop_id) {
591 case PROP_URI:
592 g_mutex_lock (&source->uri_mutex);
593 g_value_set_string (value, source->uri);
594 g_mutex_unlock (&source->uri_mutex);
595 break;
596 case PROP_USERNAME:
597 g_value_set_string (value, source->username);
598 break;
599 case PROP_PASSWORD:
600 g_value_set_string (value, source->password);
601 break;
602 case PROP_PROXYURI:
603 g_value_set_string (value, source->proxy_uri);
604 break;
605 case PROP_PROXYUSERNAME:
606 g_value_set_string (value, source->proxy_user);
607 break;
608 case PROP_PROXYPASSWORD:
609 g_value_set_string (value, source->proxy_pass);
610 break;
611 case PROP_COOKIES:
612 g_value_set_boxed (value, source->cookies);
613 break;
614 case PROP_USERAGENT:
615 g_value_set_string (value, source->user_agent);
616 break;
617 case PROP_HEADERS:
618 gst_value_set_structure (value, source->request_headers);
619 break;
620 case PROP_COMPRESS:
621 g_value_set_boolean (value, source->accept_compressed_encodings);
622 break;
623 case PROP_REDIRECT:
624 g_value_set_boolean (value, source->allow_3xx_redirect);
625 break;
626 case PROP_MAXREDIRECT:
627 g_value_set_int (value, source->max_3xx_redirects);
628 break;
629 case PROP_KEEPALIVE:
630 g_value_set_boolean (value, source->keep_alive);
631 break;
632 case PROP_TIMEOUT:
633 g_value_set_int (value, source->timeout_secs);
634 break;
635 case PROP_STRICT_SSL:
636 g_value_set_boolean (value, source->strict_ssl);
637 break;
638 case PROP_SSL_CA_FILE:
639 g_value_set_string (value, source->custom_ca_file);
640 break;
641 case PROP_RETRIES:
642 g_value_set_int (value, source->total_retries);
643 break;
644 case PROP_CONNECTIONMAXTIME:
645 g_value_set_uint (value, source->max_connection_time);
646 break;
647 case PROP_MAXCONCURRENT_SERVER:
648 g_value_set_uint (value, source->max_conns_per_server);
649 break;
650 case PROP_MAXCONCURRENT_PROXY:
651 g_value_set_uint (value, source->max_conns_per_proxy);
652 break;
653 case PROP_MAXCONCURRENT_GLOBAL:
654 g_value_set_uint (value, source->max_conns_global);
655 break;
656 case PROP_HTTPVERSION:
657 g_value_set_enum (value, source->preferred_http_version);
658 break;
659 default:
660 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
661 break;
662 }
663 GSTCURL_FUNCTION_EXIT (source);
664 }
665
666 static void
gst_curl_http_src_init(GstCurlHttpSrc * source)667 gst_curl_http_src_init (GstCurlHttpSrc * source)
668 {
669 GSTCURL_FUNCTION_ENTRY (source);
670
671 /* Assume everything is already free'd */
672 source->uri = NULL;
673 source->redirect_uri = NULL;
674 source->username = GSTCURL_HANDLE_DEFAULT_CURLOPT_USERNAME;
675 source->password = GSTCURL_HANDLE_DEFAULT_CURLOPT_PASSWORD;
676 source->proxy_uri = NULL;
677 source->proxy_user = NULL;
678 source->proxy_pass = NULL;
679 source->cookies = NULL;
680 g_assert (gst_curl_http_src_curl_capabilities != NULL);
681 source->user_agent =
682 g_strdup_printf (GSTCURL_HANDLE_DEFAULT_CURLOPT_USERAGENT "/%s",
683 gst_curl_http_src_curl_capabilities->version);
684 source->number_cookies = 0;
685 source->request_headers = gst_structure_new_empty (REQUEST_HEADERS_NAME);
686 source->allow_3xx_redirect = GSTCURL_HANDLE_DEFAULT_CURLOPT_FOLLOWLOCATION;
687 source->max_3xx_redirects = GSTCURL_HANDLE_DEFAULT_CURLOPT_MAXREDIRS;
688 source->keep_alive = GSTCURL_HANDLE_DEFAULT_CURLOPT_TCP_KEEPALIVE;
689 source->timeout_secs = GSTCURL_HANDLE_DEFAULT_CURLOPT_TIMEOUT;
690 source->max_connection_time = GSTCURL_DEFAULT_CONNECTION_TIME;
691 source->max_conns_per_server = GSTCURL_DEFAULT_CONNECTIONS_SERVER;
692 source->max_conns_per_proxy = GSTCURL_DEFAULT_CONNECTIONS_PROXY;
693 source->max_conns_global = GSTCURL_DEFAULT_CONNECTIONS_GLOBAL;
694 source->strict_ssl = GSTCURL_HANDLE_DEFAULT_CURLOPT_SSL_VERIFYPEER;
695 source->custom_ca_file = NULL;
696 source->preferred_http_version = pref_http_ver;
697 source->total_retries = GSTCURL_HANDLE_DEFAULT_RETRIES;
698 source->retries_remaining = source->total_retries;
699 source->slist = NULL;
700 source->accept_compressed_encodings = FALSE;
701
702 gst_base_src_set_automatic_eos (GST_BASE_SRC (source), FALSE);
703
704 source->proxy_uri = g_strdup (g_getenv ("http_proxy"));
705 source->no_proxy_list = g_strdup (g_getenv ("no_proxy"));
706
707 g_mutex_init (&source->uri_mutex);
708 g_mutex_init (&source->buffer_mutex);
709 g_cond_init (&source->buffer_cond);
710
711 source->buffer = NULL;
712 source->buffer_len = 0;
713 source->state = GSTCURL_NONE;
714 source->pending_state = GSTCURL_NONE;
715 source->transfer_begun = FALSE;
716 source->data_received = FALSE;
717 source->connection_status = GSTCURL_NOT_CONNECTED;
718
719 source->http_headers = NULL;
720 source->content_type = NULL;
721 source->status_code = 0;
722 source->reason_phrase = NULL;
723 source->hdrs_updated = FALSE;
724 source->curl_result = CURLE_OK;
725 gst_caps_replace (&source->caps, NULL);
726
727 GSTCURL_FUNCTION_EXIT (source);
728 }
729
730 /*
731 * Check if the Curl multi loop has been started. If not, initialise it and
732 * start it running. If it is already running, increment the refcount.
733 */
734 static void
gst_curl_http_src_ref_multi(GstCurlHttpSrc * src)735 gst_curl_http_src_ref_multi (GstCurlHttpSrc * src)
736 {
737 GstCurlHttpSrcClass *klass;
738
739 GSTCURL_FUNCTION_ENTRY (src);
740
741 /*klass = (GstCurlHttpSrcClass) g_type_class_peek_parent (src); */
742 klass = G_TYPE_INSTANCE_GET_CLASS (src, GST_TYPE_CURL_HTTP_SRC,
743 GstCurlHttpSrcClass);
744
745 g_mutex_lock (&klass->multi_task_context.mutex);
746 if (klass->multi_task_context.refcount == 0) {
747 /* Set up various in-task properties */
748
749 /* NULL is treated as the start of the list, no need to allocate. */
750 klass->multi_task_context.queue = NULL;
751
752 /* set up curl */
753 klass->multi_task_context.multi_handle = curl_multi_init ();
754
755 curl_multi_setopt (klass->multi_task_context.multi_handle,
756 CURLMOPT_PIPELINING, 1);
757 #ifdef CURLMOPT_MAX_HOST_CONNECTIONS
758 curl_multi_setopt (klass->multi_task_context.multi_handle,
759 CURLMOPT_MAX_HOST_CONNECTIONS, 1);
760 #endif
761
762 /* Start the thread */
763 g_rec_mutex_init (&klass->multi_task_context.task_rec_mutex);
764 klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_RUNNING;
765 klass->multi_task_context.task = gst_task_new (
766 (GstTaskFunction) gst_curl_http_src_curl_multi_loop,
767 (gpointer) & klass->multi_task_context, NULL);
768 gst_task_set_lock (klass->multi_task_context.task,
769 &klass->multi_task_context.task_rec_mutex);
770 if (gst_task_start (klass->multi_task_context.task) == FALSE) {
771 /*
772 * This is a pretty critical failure and is not recoverable, so commit
773 * sudoku and run away.
774 */
775 GSTCURL_ERROR_PRINT ("Couldn't start curl_multi task! Aborting.");
776 abort ();
777 }
778 GSTCURL_INFO_PRINT ("Curl multi loop has been correctly initialised!");
779 }
780 klass->multi_task_context.refcount++;
781 g_mutex_unlock (&klass->multi_task_context.mutex);
782
783 GSTCURL_FUNCTION_EXIT (src);
784 }
785
786 /*
787 * Decrement the reference count on the curl multi loop. If this is called by
788 * the last instance to hold a reference, shut down the worker. (Otherwise
789 * GStreamer can't close down with a thread still running). Also offers the
790 * "force_all" boolean parameter, which if TRUE removes all references and shuts
791 * down.
792 */
793 static void
gst_curl_http_src_unref_multi(GstCurlHttpSrc * src)794 gst_curl_http_src_unref_multi (GstCurlHttpSrc * src)
795 {
796 GstCurlHttpSrcClass *klass;
797
798 GSTCURL_FUNCTION_ENTRY (src);
799
800 klass = G_TYPE_INSTANCE_GET_CLASS (src, GST_TYPE_CURL_HTTP_SRC,
801 GstCurlHttpSrcClass);
802
803 g_mutex_lock (&klass->multi_task_context.mutex);
804 klass->multi_task_context.refcount--;
805 GST_INFO_OBJECT (src, "Closing instance, worker thread refcount is now %u",
806 klass->multi_task_context.refcount);
807
808 if (klass->multi_task_context.refcount == 0) {
809 /* Everything's done! Clean up. */
810 gst_task_stop (klass->multi_task_context.task);
811 klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_STOP;
812 g_cond_signal (&klass->multi_task_context.signal);
813 g_mutex_unlock (&klass->multi_task_context.mutex);
814 GST_DEBUG_OBJECT (src, "Joining curl_multi_loop task...");
815 gst_task_join (klass->multi_task_context.task);
816 gst_object_unref (klass->multi_task_context.task);
817 klass->multi_task_context.task = NULL;
818 curl_multi_cleanup (klass->multi_task_context.multi_handle);
819 klass->multi_task_context.multi_handle = NULL;
820 g_rec_mutex_clear (&klass->multi_task_context.task_rec_mutex);
821 GST_DEBUG_OBJECT (src, "multi_task_context cleanup complete");
822 } else {
823 g_mutex_unlock (&klass->multi_task_context.mutex);
824 }
825
826 GSTCURL_FUNCTION_EXIT (src);
827 }
828
829 static void
gst_curl_http_src_finalize(GObject * obj)830 gst_curl_http_src_finalize (GObject * obj)
831 {
832 GstCurlHttpSrc *src = GST_CURLHTTPSRC (obj);
833
834 GSTCURL_FUNCTION_ENTRY (src);
835
836 /* Cleanup all memory allocated */
837 gst_curl_http_src_cleanup_instance (src);
838
839 GSTCURL_FUNCTION_EXIT (src);
840
841 /* Chain up to parent class */
842 G_OBJECT_CLASS (gst_curl_http_src_parent_class)->finalize (obj);
843 }
844
845 /*
846 * Do the transfer. If the transfer hasn't begun yet, start a new curl handle
847 * and pass it to the multi queue to be operated on. Then wait for any blocks
848 * of data and push them to the source pad.
849 */
850 static GstFlowReturn
gst_curl_http_src_create(GstPushSrc * psrc,GstBuffer ** outbuf)851 gst_curl_http_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
852 {
853 GstFlowReturn ret;
854 GstCurlHttpSrc *src = GST_CURLHTTPSRC (psrc);
855 GstCurlHttpSrcClass *klass;
856 GstStructure *empty_headers;
857 GstBaseSrc *basesrc;
858
859 GSTCURL_FUNCTION_ENTRY (src);
860
861 klass = G_TYPE_INSTANCE_GET_CLASS (src, GST_TYPE_CURL_HTTP_SRC,
862 GstCurlHttpSrcClass);
863 basesrc = GST_BASE_SRC_CAST (src);
864
865 retry:
866 ret = GST_FLOW_OK;
867 /* NOTE: when both the buffer_mutex and multi_task_context.mutex are
868 needed, multi_task_context.mutex must be acquired first */
869 g_mutex_lock (&klass->multi_task_context.mutex);
870 g_mutex_lock (&src->buffer_mutex);
871 if (src->state == GSTCURL_UNLOCK) {
872 ret = GST_FLOW_FLUSHING;
873 goto escape;
874 }
875
876 if (!src->transfer_begun) {
877 GST_DEBUG_OBJECT (src, "Starting new request for URI %s", src->uri);
878 /* Create the Easy Handle and set up the session. */
879 src->curl_handle = gst_curl_http_src_create_easy_handle (src);
880 if (src->curl_handle == NULL) {
881 ret = GST_FLOW_ERROR;
882 goto escape;
883 }
884
885 if (gst_curl_http_src_add_queue_item (&klass->multi_task_context.queue, src)
886 == FALSE) {
887 GST_ERROR_OBJECT (src, "Couldn't create new queue item! Aborting...");
888 ret = GST_FLOW_ERROR;
889 goto escape;
890 }
891 /* Signal the worker thread */
892 g_cond_signal (&klass->multi_task_context.signal);
893
894 src->state = GSTCURL_OK;
895 src->transfer_begun = TRUE;
896 src->data_received = FALSE;
897
898 GST_DEBUG_OBJECT (src, "Submitted request for URI %s to curl", src->uri);
899
900 if (src->http_headers != NULL) {
901 gst_structure_free (src->http_headers);
902 }
903 empty_headers = gst_structure_new_empty (RESPONSE_HEADERS_NAME);
904 src->http_headers = gst_structure_new (HTTP_HEADERS_NAME,
905 URI_NAME, G_TYPE_STRING, src->uri,
906 REQUEST_HEADERS_NAME, GST_TYPE_STRUCTURE, src->request_headers,
907 RESPONSE_HEADERS_NAME, GST_TYPE_STRUCTURE, empty_headers, NULL);
908 gst_structure_free (empty_headers);
909 GST_INFO_OBJECT (src, "Created a new headers object");
910 }
911
912 g_mutex_unlock (&klass->multi_task_context.mutex);
913
914 /* Wait for data to become available, then punt it downstream */
915 while ((src->buffer_len == 0) && (src->state == GSTCURL_OK)
916 && (src->connection_status == GSTCURL_CONNECTED)) {
917 g_cond_wait (&src->buffer_cond, &src->buffer_mutex);
918 }
919
920 if (src->state == GSTCURL_UNLOCK) {
921 if (src->buffer_len > 0) {
922 g_free (src->buffer);
923 src->buffer = NULL;
924 src->buffer_len = 0;
925 }
926 g_mutex_unlock (&src->buffer_mutex);
927 return GST_FLOW_FLUSHING;
928 }
929
930 ret = gst_curl_http_src_handle_response (src);
931 switch (ret) {
932 case GST_FLOW_ERROR:
933 /* Don't attempt a retry, just bomb out */
934 g_mutex_unlock (&src->buffer_mutex);
935 return ret;
936 case GST_FLOW_CUSTOM_ERROR:
937 if (src->data_received == TRUE) {
938 /*
939 * If data has already been received, we can't recall previously sent
940 * buffers so don't attempt a retry in this case.
941 *
942 * TODO: Remember the position we got to, and make a range request for
943 * the resource without the bit we've already received?
944 */
945 GST_WARNING_OBJECT (src,
946 "Failed mid-transfer, can't continue for URI %s", src->uri);
947 g_mutex_unlock (&src->buffer_mutex);
948 return GST_FLOW_ERROR;
949 }
950 src->retries_remaining--;
951 if (src->retries_remaining == 0) {
952 GST_WARNING_OBJECT (src, "Out of retries for URI %s", src->uri);
953 g_mutex_unlock (&src->buffer_mutex);
954 return GST_FLOW_ERROR; /* Don't attempt a retry, just bomb out */
955 }
956 GST_INFO_OBJECT (src, "Attempting retry for URI %s", src->uri);
957 src->state = GSTCURL_NONE;
958 src->transfer_begun = FALSE;
959 src->status_code = 0;
960 g_free (src->reason_phrase);
961 src->reason_phrase = NULL;
962 src->hdrs_updated = FALSE;
963 if (src->http_headers != NULL) {
964 gst_structure_free (src->http_headers);
965 src->http_headers = NULL;
966 GST_INFO_OBJECT (src, "NULL'd the headers");
967 }
968 gst_curl_http_src_destroy_easy_handle (src);
969 g_mutex_unlock (&src->buffer_mutex);
970 goto retry; /* Attempt a retry! */
971 default:
972 break;
973 }
974
975 if (((src->state == GSTCURL_OK) || (src->state == GSTCURL_DONE)) &&
976 (src->buffer_len > 0)) {
977
978 GST_DEBUG_OBJECT (src, "Pushing %u bytes of transfer for URI %s to pad",
979 src->buffer_len, src->uri);
980 *outbuf = gst_buffer_new_allocate (NULL, src->buffer_len, NULL);
981 gst_buffer_fill (*outbuf, 0, src->buffer, src->buffer_len);
982 GST_BUFFER_OFFSET (*outbuf) = basesrc->segment.position;
983
984 g_free (src->buffer);
985 src->buffer = NULL;
986 src->buffer_len = 0;
987 src->data_received = TRUE;
988
989 /* ret should still be GST_FLOW_OK */
990 } else if ((src->state == GSTCURL_DONE) && (src->buffer_len == 0)) {
991 GST_INFO_OBJECT (src, "Full body received, signalling EOS for URI %s.",
992 src->uri);
993 src->state = GSTCURL_NONE;
994 src->transfer_begun = FALSE;
995 src->status_code = 0;
996 g_free (src->reason_phrase);
997 src->reason_phrase = NULL;
998 src->hdrs_updated = FALSE;
999 gst_curl_http_src_destroy_easy_handle (src);
1000 ret = GST_FLOW_EOS;
1001 } else {
1002 switch (src->state) {
1003 case GSTCURL_NONE:
1004 GST_WARNING_OBJECT (src, "Got unexpected GSTCURL_NONE state!");
1005 break;
1006 case GSTCURL_REMOVED:
1007 GST_WARNING_OBJECT (src, "Transfer got removed from the curl queue");
1008 ret = GST_FLOW_EOS;
1009 break;
1010 case GSTCURL_BAD_QUEUE_REQUEST:
1011 GST_ERROR_OBJECT (src, "Bad Queue Request!");
1012 ret = GST_FLOW_ERROR;
1013 break;
1014 case GSTCURL_TOTAL_ERROR:
1015 GST_ERROR_OBJECT (src, "Critical, unrecoverable error!");
1016 ret = GST_FLOW_ERROR;
1017 break;
1018 case GSTCURL_PIPELINE_NULL:
1019 GST_ERROR_OBJECT (src, "Pipeline null");
1020 break;
1021 default:
1022 GST_ERROR_OBJECT (src, "Unknown state of %u", src->state);
1023 }
1024 }
1025 g_mutex_unlock (&src->buffer_mutex);
1026 GSTCURL_FUNCTION_EXIT (src);
1027 return ret;
1028
1029 escape:
1030 g_mutex_unlock (&src->buffer_mutex);
1031 g_mutex_unlock (&klass->multi_task_context.mutex);
1032
1033 GSTCURL_FUNCTION_EXIT (src);
1034 return ret;
1035 }
1036
1037 /*
1038 * Convert header from a GstStructure type to a curl_slist type that curl will
1039 * understand.
1040 */
1041 static gboolean
_headers_to_curl_slist(GQuark field_id,const GValue * value,gpointer ptr)1042 _headers_to_curl_slist (GQuark field_id, const GValue * value, gpointer ptr)
1043 {
1044 gchar *field;
1045 struct curl_slist **p_slist = ptr;
1046
1047 field = g_strdup_printf ("%s: %s", g_quark_to_string (field_id),
1048 g_value_get_string (value));
1049
1050 *p_slist = curl_slist_append (*p_slist, field);
1051
1052 g_free (field);
1053
1054 return TRUE;
1055 }
1056
1057 /*
1058 * From the data in the queue element s, create a CURL easy handle and populate
1059 * options with the URL, proxy data, login options, cookies,
1060 */
1061 static CURL *
gst_curl_http_src_create_easy_handle(GstCurlHttpSrc * s)1062 gst_curl_http_src_create_easy_handle (GstCurlHttpSrc * s)
1063 {
1064 CURL *handle;
1065 gint i;
1066 GSTCURL_FUNCTION_ENTRY (s);
1067
1068 /* This is mandatory and yet not default option, so if this is NULL
1069 * then something very bad is going on. */
1070 if (s->uri == NULL) {
1071 GST_ERROR_OBJECT (s, "No URI for curl!");
1072 return NULL;
1073 }
1074
1075 handle = curl_easy_init ();
1076 if (handle == NULL) {
1077 GST_ERROR_OBJECT (s, "Couldn't init a curl easy handle!");
1078 return NULL;
1079 }
1080 GST_INFO_OBJECT (s, "Creating a new handle for URI %s", s->uri);
1081
1082 gst_curl_setopt_str (s, handle, CURLOPT_URL, s->uri);
1083 gst_curl_setopt_str (s, handle, CURLOPT_USERNAME, s->username);
1084 gst_curl_setopt_str (s, handle, CURLOPT_PASSWORD, s->password);
1085 gst_curl_setopt_str (s, handle, CURLOPT_PROXY, s->proxy_uri);
1086 gst_curl_setopt_str (s, handle, CURLOPT_NOPROXY, s->no_proxy_list);
1087 gst_curl_setopt_str (s, handle, CURLOPT_PROXYUSERNAME, s->proxy_user);
1088 gst_curl_setopt_str (s, handle, CURLOPT_PROXYPASSWORD, s->proxy_pass);
1089
1090 for (i = 0; i < s->number_cookies; i++) {
1091 gst_curl_setopt_str (s, handle, CURLOPT_COOKIELIST, s->cookies[i]);
1092 }
1093
1094 /* curl_slist_append dynamically allocates memory, but I need to free it */
1095 if (s->request_headers != NULL) {
1096 gst_structure_foreach (s->request_headers, _headers_to_curl_slist,
1097 &s->slist);
1098 if (curl_easy_setopt (handle, CURLOPT_HTTPHEADER, s->slist) != CURLE_OK) {
1099 GST_WARNING_OBJECT (s, "Failed to set HTTP headers!");
1100 }
1101 }
1102
1103 gst_curl_setopt_str_default (s, handle, CURLOPT_USERAGENT, s->user_agent);
1104
1105 /*
1106 * Unlike soup, this isn't a binary op, curl wants a string here. So if it's
1107 * TRUE, simply set the value as an empty string as this allows both gzip and
1108 * zlib compression methods.
1109 */
1110 if (s->accept_compressed_encodings == TRUE) {
1111 gst_curl_setopt_str (s, handle, CURLOPT_ACCEPT_ENCODING, "");
1112 } else {
1113 gst_curl_setopt_str (s, handle, CURLOPT_ACCEPT_ENCODING, "identity");
1114 }
1115
1116 gst_curl_setopt_int (s, handle, CURLOPT_FOLLOWLOCATION,
1117 s->allow_3xx_redirect);
1118 gst_curl_setopt_int_default (s, handle, CURLOPT_MAXREDIRS,
1119 s->max_3xx_redirects);
1120 gst_curl_setopt_bool (s, handle, CURLOPT_TCP_KEEPALIVE, s->keep_alive);
1121 gst_curl_setopt_int (s, handle, CURLOPT_TIMEOUT, s->timeout_secs);
1122 gst_curl_setopt_bool (s, handle, CURLOPT_SSL_VERIFYPEER, s->strict_ssl);
1123 gst_curl_setopt_str (s, handle, CURLOPT_CAINFO, s->custom_ca_file);
1124
1125 switch (s->preferred_http_version) {
1126 case GSTCURL_HTTP_VERSION_1_0:
1127 GST_DEBUG_OBJECT (s, "Setting version as HTTP/1.0");
1128 gst_curl_setopt_int (s, handle, CURLOPT_HTTP_VERSION,
1129 CURL_HTTP_VERSION_1_0);
1130 break;
1131 case GSTCURL_HTTP_VERSION_1_1:
1132 GST_DEBUG_OBJECT (s, "Setting version as HTTP/1.1");
1133 gst_curl_setopt_int (s, handle, CURLOPT_HTTP_VERSION,
1134 CURL_HTTP_VERSION_1_1);
1135 break;
1136 #ifdef CURL_VERSION_HTTP2
1137 case GSTCURL_HTTP_VERSION_2_0:
1138 GST_DEBUG_OBJECT (s, "Setting version as HTTP/2.0");
1139 if (curl_easy_setopt (handle, CURLOPT_HTTP_VERSION,
1140 CURL_HTTP_VERSION_2_0) != CURLE_OK) {
1141 if (gst_curl_http_src_curl_capabilities->features & CURL_VERSION_HTTP2) {
1142 GST_WARNING_OBJECT (s,
1143 "Cannot set unsupported option CURLOPT_HTTP_VERSION");
1144 } else {
1145 GST_INFO_OBJECT (s, "HTTP/2 unsupported by libcurl at this time");
1146 }
1147 }
1148 break;
1149 #endif
1150 default:
1151 GST_WARNING_OBJECT (s,
1152 "Supplied a bogus HTTP version, using curl default!");
1153 }
1154
1155 gst_curl_setopt_generic (s, handle, CURLOPT_HEADERFUNCTION,
1156 gst_curl_http_src_get_header);
1157 gst_curl_setopt_str (s, handle, CURLOPT_HEADERDATA, s);
1158 gst_curl_setopt_generic (s, handle, CURLOPT_WRITEFUNCTION,
1159 gst_curl_http_src_get_chunks);
1160 gst_curl_setopt_str (s, handle, CURLOPT_WRITEDATA, s);
1161
1162 gst_curl_setopt_str (s, handle, CURLOPT_ERRORBUFFER, s->curl_errbuf);
1163
1164 GSTCURL_FUNCTION_EXIT (s);
1165 return handle;
1166 }
1167
1168 /*
1169 * Check the return type from the curl transfer. If it was okay, then deal with
1170 * any headers that were received. Headers should only be dealt with once - but
1171 * we might get a second set if there are trailing headers (RFC7230 Section 4.4)
1172 */
1173 static GstFlowReturn
gst_curl_http_src_handle_response(GstCurlHttpSrc * src)1174 gst_curl_http_src_handle_response (GstCurlHttpSrc * src)
1175 {
1176 glong curl_info_long;
1177 gdouble curl_info_dbl;
1178 gchar *redirect_url;
1179 GstBaseSrc *basesrc;
1180 const GValue *response_headers;
1181 GstFlowReturn ret = GST_FLOW_OK;
1182
1183 GSTCURL_FUNCTION_ENTRY (src);
1184
1185 GST_TRACE_OBJECT (src, "status code: %d (%s), curl return code %d",
1186 src->status_code, src->reason_phrase, src->curl_result);
1187
1188 /* Check the curl result code first - anything not 0 is probably a failure */
1189 if (src->curl_result != 0) {
1190 GST_WARNING_OBJECT (src, "Curl failed the transfer (%d): %s",
1191 src->curl_result, curl_easy_strerror (src->curl_result));
1192 GST_DEBUG_OBJECT (src, "Reason for curl failure: %s", src->curl_errbuf);
1193 return GST_FLOW_ERROR;
1194 }
1195
1196 /*
1197 * What response code do we have?
1198 */
1199 if (src->status_code >= 400) {
1200 GST_WARNING_OBJECT (src, "Transfer for URI %s returned error status %u",
1201 src->uri, src->status_code);
1202 src->retries_remaining = 0;
1203 CURL_HTTP_SRC_ERROR (src, RESOURCE, NOT_FOUND, (src->reason_phrase));
1204 return GST_FLOW_ERROR;
1205 } else if (src->status_code == 0) {
1206 if (curl_easy_getinfo (src->curl_handle, CURLINFO_TOTAL_TIME,
1207 &curl_info_dbl) != CURLE_OK) {
1208 /* Curl cannot be relied on in this state, so return an error. */
1209 return GST_FLOW_ERROR;
1210 }
1211 if (curl_info_dbl > src->timeout_secs) {
1212 return GST_FLOW_CUSTOM_ERROR;
1213 }
1214
1215 if (curl_easy_getinfo (src->curl_handle, CURLINFO_OS_ERRNO,
1216 &curl_info_long) != CURLE_OK) {
1217 /* Curl cannot be relied on in this state, so return an error. */
1218 return GST_FLOW_ERROR;
1219
1220 }
1221
1222 GST_WARNING_OBJECT (src, "Errno for CONNECT call was %ld (%s)",
1223 curl_info_long, g_strerror ((gint) curl_info_long));
1224
1225 /* Some of these responses are retry-able, others not. Set the returned
1226 * state to ERROR so we crash out instead of fruitlessly retrying.
1227 */
1228 if (curl_info_long == ECONNREFUSED) {
1229 return GST_FLOW_ERROR;
1230 }
1231 ret = GST_FLOW_CUSTOM_ERROR;
1232 }
1233
1234
1235 if (ret == GST_FLOW_CUSTOM_ERROR) {
1236 src->hdrs_updated = FALSE;
1237 GSTCURL_FUNCTION_EXIT (src);
1238 return ret;
1239 }
1240
1241 /* Only do this once */
1242 if (src->hdrs_updated == FALSE) {
1243 GSTCURL_FUNCTION_EXIT (src);
1244 return GST_FLOW_OK;
1245 }
1246
1247 /*
1248 * Deal with redirections...
1249 */
1250 if (curl_easy_getinfo (src->curl_handle, CURLINFO_EFFECTIVE_URL,
1251 &redirect_url)
1252 == CURLE_OK) {
1253 size_t lena, lenb;
1254 lena = strlen (src->uri);
1255 lenb = strlen (redirect_url);
1256 if (g_ascii_strncasecmp (src->uri, redirect_url,
1257 (lena > lenb) ? lenb : lena) != 0) {
1258 GST_INFO_OBJECT (src, "Got a redirect to %s, setting as redirect URI",
1259 redirect_url);
1260 src->redirect_uri = g_strdup (redirect_url);
1261 gst_structure_remove_field (src->http_headers, REDIRECT_URI_NAME);
1262 gst_structure_set (src->http_headers, REDIRECT_URI_NAME,
1263 G_TYPE_STRING, redirect_url, NULL);
1264 }
1265 }
1266
1267 /*
1268 * Push the content length
1269 */
1270 if (curl_easy_getinfo (src->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD,
1271 &curl_info_dbl) == CURLE_OK) {
1272 if (curl_info_dbl == -1) {
1273 GST_WARNING_OBJECT (src,
1274 "No Content-Length was specified in the response.");
1275 } else {
1276 GST_INFO_OBJECT (src, "Content-Length was given as %.0f", curl_info_dbl);
1277 basesrc = GST_BASE_SRC_CAST (src);
1278 basesrc->segment.duration = curl_info_dbl;
1279 gst_element_post_message (GST_ELEMENT (src),
1280 gst_message_new_duration_changed (GST_OBJECT (src)));
1281 }
1282 }
1283
1284 /*
1285 * Push all the received headers down via a sicky event
1286 */
1287 response_headers = gst_structure_get_value (src->http_headers,
1288 RESPONSE_HEADERS_NAME);
1289 if (gst_structure_n_fields (gst_value_get_structure (response_headers)) > 0) {
1290 GstEvent *hdrs_event;
1291
1292 gst_element_post_message (GST_ELEMENT_CAST (src),
1293 gst_message_new_element (GST_OBJECT_CAST (src),
1294 gst_structure_copy (src->http_headers)));
1295
1296 /* gst_event_new_custom takes ownership of our structure */
1297 hdrs_event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM_STICKY,
1298 gst_structure_copy (src->http_headers));
1299 gst_pad_push_event (GST_BASE_SRC_PAD (src), hdrs_event);
1300 GST_INFO_OBJECT (src, "Pushed headers downstream");
1301 }
1302
1303 src->hdrs_updated = FALSE;
1304
1305 GSTCURL_FUNCTION_EXIT (src);
1306
1307 return ret;
1308 }
1309
1310 /*
1311 * "Negotiate" capabilities between us and the sink.
1312 * I.e. tell the sink device what data to expect. We can't be told what to send
1313 * unless we implement "only return to me if this type" property. Potential TODO
1314 */
1315 static gboolean
gst_curl_http_src_negotiate_caps(GstCurlHttpSrc * src)1316 gst_curl_http_src_negotiate_caps (GstCurlHttpSrc * src)
1317 {
1318 const GValue *response_headers;
1319 const GstStructure *response_struct;
1320
1321 GST_INFO_OBJECT (src, "Negotiating caps...");
1322 if (src->caps && src->http_headers) {
1323 response_headers =
1324 gst_structure_get_value (src->http_headers, RESPONSE_HEADERS_NAME);
1325 if (!response_headers) {
1326 GST_WARNING_OBJECT (src, "Failed to get %s", RESPONSE_HEADERS_NAME);
1327 return FALSE;
1328 }
1329 response_struct = gst_value_get_structure (response_headers);
1330 if (gst_structure_has_field_typed (response_struct, "content-type",
1331 G_TYPE_STRING)) {
1332 const gchar *content_type =
1333 gst_structure_get_string (response_struct, "content-type");
1334 GST_INFO_OBJECT (src, "Setting caps as Content-Type of %s", content_type);
1335 src->caps = gst_caps_make_writable (src->caps);
1336 gst_caps_set_simple (src->caps, "content-type", G_TYPE_STRING,
1337 content_type, NULL);
1338 if (gst_base_src_set_caps (GST_BASE_SRC (src), src->caps) != TRUE) {
1339 GST_ERROR_OBJECT (src, "Setting caps failed!");
1340 return FALSE;
1341 }
1342 }
1343 } else {
1344 GST_DEBUG_OBJECT (src, "No caps have been set, continue.");
1345 }
1346
1347 return TRUE;
1348 }
1349
1350 /*
1351 * Cleanup the CURL easy handle once we're done with it.
1352 */
1353 static inline void
gst_curl_http_src_destroy_easy_handle(GstCurlHttpSrc * src)1354 gst_curl_http_src_destroy_easy_handle (GstCurlHttpSrc * src)
1355 {
1356 /* Thank you Handles, and well done. Well done, mate. */
1357 if (src->curl_handle != NULL) {
1358 curl_easy_cleanup (src->curl_handle);
1359 src->curl_handle = NULL;
1360 }
1361 /* In addition, clean up the curl header slist if it was used. */
1362 if (src->slist != NULL) {
1363 curl_slist_free_all (src->slist);
1364 src->slist = NULL;
1365 }
1366 }
1367
1368 static GstStateChangeReturn
gst_curl_http_src_change_state(GstElement * element,GstStateChange transition)1369 gst_curl_http_src_change_state (GstElement * element, GstStateChange transition)
1370 {
1371 GstStateChangeReturn ret;
1372 GstCurlHttpSrc *source = GST_CURLHTTPSRC (element);
1373 GSTCURL_FUNCTION_ENTRY (source);
1374
1375 switch (transition) {
1376 case GST_STATE_CHANGE_NULL_TO_READY:
1377 gst_curl_http_src_ref_multi (source);
1378 break;
1379 case GST_STATE_CHANGE_READY_TO_PAUSED:
1380 if (source->uri == NULL) {
1381 GST_ELEMENT_ERROR (element, RESOURCE, OPEN_READ, (_("No URL set.")),
1382 ("Missing URL"));
1383 return GST_STATE_CHANGE_FAILURE;
1384 }
1385 break;
1386 case GST_STATE_CHANGE_READY_TO_NULL:
1387 GST_DEBUG_OBJECT (source, "Removing from multi_loop queue...");
1388 /* The pipeline has ended, so signal any running request to end
1389 and wait until the multi_loop has stopped using this element */
1390 gst_curl_http_src_wait_until_removed (source);
1391 gst_curl_http_src_unref_multi (source);
1392 break;
1393 default:
1394 break;
1395 }
1396
1397 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1398
1399 GSTCURL_FUNCTION_EXIT (source);
1400 return ret;
1401 }
1402
1403 /*
1404 * Take care of any memory that may be left over from the instance that's now
1405 * closing before we leak it.
1406 */
1407 static void
gst_curl_http_src_cleanup_instance(GstCurlHttpSrc * src)1408 gst_curl_http_src_cleanup_instance (GstCurlHttpSrc * src)
1409 {
1410 gint i;
1411 g_mutex_lock (&src->uri_mutex);
1412 g_free (src->uri);
1413 src->uri = NULL;
1414 g_free (src->redirect_uri);
1415 src->redirect_uri = NULL;
1416 g_mutex_unlock (&src->uri_mutex);
1417 g_mutex_clear (&src->uri_mutex);
1418
1419 g_free (src->proxy_uri);
1420 src->proxy_uri = NULL;
1421 g_free (src->no_proxy_list);
1422 src->no_proxy_list = NULL;
1423 g_free (src->proxy_user);
1424 src->proxy_user = NULL;
1425 g_free (src->proxy_pass);
1426 src->proxy_pass = NULL;
1427
1428 for (i = 0; i < src->number_cookies; i++) {
1429 g_free (src->cookies[i]);
1430 src->cookies[i] = NULL;
1431 }
1432 g_free (src->cookies);
1433 src->cookies = NULL;
1434
1435 g_free (src->user_agent);
1436 src->user_agent = NULL;
1437
1438 g_mutex_clear (&src->buffer_mutex);
1439
1440 g_cond_clear (&src->buffer_cond);
1441
1442 g_free (src->buffer);
1443 src->buffer = NULL;
1444
1445 if (src->request_headers) {
1446 gst_structure_free (src->request_headers);
1447 src->request_headers = NULL;
1448 }
1449 if (src->http_headers != NULL) {
1450 gst_structure_free (src->http_headers);
1451 src->http_headers = NULL;
1452 }
1453 g_free (src->reason_phrase);
1454 src->reason_phrase = NULL;
1455 gst_caps_replace (&src->caps, NULL);
1456
1457 gst_curl_http_src_destroy_easy_handle (src);
1458 }
1459
1460 static gboolean
gst_curl_http_src_query(GstBaseSrc * bsrc,GstQuery * query)1461 gst_curl_http_src_query (GstBaseSrc * bsrc, GstQuery * query)
1462 {
1463 GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc);
1464 gboolean ret;
1465 GSTCURL_FUNCTION_ENTRY (src);
1466
1467 switch (GST_QUERY_TYPE (query)) {
1468 case GST_QUERY_URI:
1469 g_mutex_lock (&src->uri_mutex);
1470 gst_query_set_uri (query, src->uri);
1471 if (src->redirect_uri != NULL) {
1472 gst_query_set_uri_redirection (query, src->redirect_uri);
1473 }
1474 g_mutex_unlock (&src->uri_mutex);
1475 ret = TRUE;
1476 break;
1477 default:
1478 ret = GST_BASE_SRC_CLASS (parent_class)->query (bsrc, query);
1479 break;
1480 }
1481
1482 GSTCURL_FUNCTION_EXIT (src);
1483 return ret;
1484 }
1485
1486 static gboolean
gst_curl_http_src_get_content_length(GstBaseSrc * bsrc,guint64 * size)1487 gst_curl_http_src_get_content_length (GstBaseSrc * bsrc, guint64 * size)
1488 {
1489 GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc);
1490 const GValue *response_headers;
1491 gboolean ret = FALSE;
1492
1493 if (src->http_headers == NULL) {
1494 return FALSE;
1495 }
1496
1497 response_headers = gst_structure_get_value (src->http_headers,
1498 RESPONSE_HEADERS_NAME);
1499 if (gst_structure_has_field_typed (gst_value_get_structure (response_headers),
1500 "content-length", G_TYPE_STRING)) {
1501 const gchar *content_length =
1502 gst_structure_get_string (gst_value_get_structure (response_headers),
1503 "content-length");
1504 *size = (guint64) g_ascii_strtoull (content_length, NULL, 10);
1505 ret = TRUE;
1506 } else {
1507 GST_DEBUG_OBJECT (src,
1508 "No content length has yet been set, or there was an error!");
1509 }
1510 return ret;
1511 }
1512
1513 static void
gst_curl_http_src_uri_handler_init(gpointer g_iface,gpointer iface_data)1514 gst_curl_http_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
1515 {
1516 GstURIHandlerInterface *uri_iface = (GstURIHandlerInterface *) g_iface;
1517
1518 uri_iface->get_type = gst_curl_http_src_urihandler_get_type;
1519 uri_iface->get_protocols = gst_curl_http_src_urihandler_get_protocols;
1520 uri_iface->get_uri = gst_curl_http_src_urihandler_get_uri;
1521 uri_iface->set_uri = gst_curl_http_src_urihandler_set_uri;
1522 }
1523
1524 static guint
gst_curl_http_src_urihandler_get_type(GType type)1525 gst_curl_http_src_urihandler_get_type (GType type)
1526 {
1527 return GST_URI_SRC;
1528 }
1529
1530 static const gchar *const *
gst_curl_http_src_urihandler_get_protocols(GType type)1531 gst_curl_http_src_urihandler_get_protocols (GType type)
1532 {
1533 static const gchar *protocols[] = { "http", "https", NULL };
1534
1535 return protocols;
1536 }
1537
1538 static gchar *
gst_curl_http_src_urihandler_get_uri(GstURIHandler * handler)1539 gst_curl_http_src_urihandler_get_uri (GstURIHandler * handler)
1540 {
1541 gchar *ret;
1542 GstCurlHttpSrc *source;
1543
1544 g_return_val_if_fail (GST_IS_URI_HANDLER (handler), NULL);
1545 source = GST_CURLHTTPSRC (handler);
1546
1547 GSTCURL_FUNCTION_ENTRY (source);
1548
1549 g_mutex_lock (&source->uri_mutex);
1550 ret = g_strdup (source->uri);
1551 g_mutex_unlock (&source->uri_mutex);
1552
1553 GSTCURL_FUNCTION_EXIT (source);
1554 return ret;
1555 }
1556
1557 static gboolean
gst_curl_http_src_urihandler_set_uri(GstURIHandler * handler,const gchar * uri,GError ** error)1558 gst_curl_http_src_urihandler_set_uri (GstURIHandler * handler,
1559 const gchar * uri, GError ** error)
1560 {
1561 GstCurlHttpSrc *source = GST_CURLHTTPSRC (handler);
1562 GSTCURL_FUNCTION_ENTRY (source);
1563
1564 g_return_val_if_fail (GST_IS_URI_HANDLER (handler), FALSE);
1565 g_return_val_if_fail (uri != NULL, FALSE);
1566
1567 g_mutex_lock (&source->uri_mutex);
1568
1569 if (source->uri != NULL) {
1570 GST_DEBUG_OBJECT (source,
1571 "URI already present as %s, updating to new URI %s", source->uri, uri);
1572 g_free (source->uri);
1573 }
1574
1575 source->uri = g_strdup (uri);
1576 if (source->uri == NULL) {
1577 g_mutex_unlock (&source->uri_mutex);
1578 return FALSE;
1579 }
1580 source->retries_remaining = source->total_retries;
1581
1582 g_mutex_unlock (&source->uri_mutex);
1583
1584 GSTCURL_FUNCTION_EXIT (source);
1585 return TRUE;
1586 }
1587
1588 /*
1589 * Cancel any currently running transfer, and then signal all the loops to drop
1590 * any received buffers. The ::create() method should return GST_FLOW_FLUSHING.
1591 */
1592 static gboolean
gst_curl_http_src_unlock(GstBaseSrc * bsrc)1593 gst_curl_http_src_unlock (GstBaseSrc * bsrc)
1594 {
1595 GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc);
1596 gboolean want_removal = FALSE;
1597
1598 g_mutex_lock (&src->buffer_mutex);
1599 if (src->state != GSTCURL_UNLOCK) {
1600 if (src->state == GSTCURL_OK) {
1601 /* A transfer is running, cancel it */
1602 if (src->connection_status == GSTCURL_CONNECTED) {
1603 src->connection_status = GSTCURL_WANT_REMOVAL;
1604 }
1605 want_removal = TRUE;
1606 }
1607 src->pending_state = src->state;
1608 src->state = GSTCURL_UNLOCK;
1609 }
1610 g_cond_signal (&src->buffer_cond);
1611 g_mutex_unlock (&src->buffer_mutex);
1612
1613 if (want_removal) {
1614 GstCurlHttpSrcClass *klass = G_TYPE_INSTANCE_GET_CLASS (src,
1615 GST_TYPE_CURL_HTTP_SRC,
1616 GstCurlHttpSrcClass);
1617 g_mutex_lock (&klass->multi_task_context.mutex);
1618 g_cond_signal (&klass->multi_task_context.signal);
1619 g_mutex_unlock (&klass->multi_task_context.mutex);
1620 }
1621
1622 return TRUE;
1623 }
1624
1625 /*
1626 * Finish the unlock request above and return curlhttpsrc to the normal state.
1627 * This will probably be GSTCURL_DONE, and the next return from ::create() will
1628 * be GST_FLOW_EOS as we don't want to deliver parts of a HTTP body.
1629 */
1630 static gboolean
gst_curl_http_src_unlock_stop(GstBaseSrc * bsrc)1631 gst_curl_http_src_unlock_stop (GstBaseSrc * bsrc)
1632 {
1633 GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc);
1634
1635 g_mutex_lock (&src->buffer_mutex);
1636 src->state = src->pending_state;
1637 src->pending_state = GSTCURL_NONE;
1638 g_cond_signal (&src->buffer_cond);
1639 g_mutex_unlock (&src->buffer_mutex);
1640
1641 return TRUE;
1642 }
1643
1644 /*****************************************************************************
1645 * Curl loop task functions begin
1646 *****************************************************************************/
1647 static void
gst_curl_http_src_curl_multi_loop(gpointer thread_data)1648 gst_curl_http_src_curl_multi_loop (gpointer thread_data)
1649 {
1650 GstCurlHttpSrcMultiTaskContext *context;
1651 GstCurlHttpSrcQueueElement *qelement, *qnext;
1652 gint i, still_running = 0;
1653 CURLMsg *curl_message;
1654 GstCurlHttpSrc *elt;
1655 guint active = 0;
1656
1657 context = (GstCurlHttpSrcMultiTaskContext *) thread_data;
1658
1659 g_mutex_lock (&context->mutex);
1660
1661 /* Someone is holding a reference to us, but isn't using us so to avoid
1662 * unnecessary clock cycle wasting, sit in a conditional wait until woken.
1663 */
1664 while (context->queue == NULL
1665 && context->state == GSTCURL_MULTI_LOOP_STATE_RUNNING) {
1666 GSTCURL_DEBUG_PRINT ("Waiting for an element to be added...");
1667 g_cond_wait (&context->signal, &context->mutex);
1668 GSTCURL_DEBUG_PRINT ("Received wake up call!");
1669 }
1670 if (context->state == GSTCURL_MULTI_LOOP_STATE_STOP) {
1671 GSTCURL_INFO_PRINT ("Got instruction to shut down");
1672 goto out;
1673 }
1674
1675 /* check for elements that need to be started or removed */
1676 qelement = context->queue;
1677 while (qelement != NULL) {
1678 qnext = qelement->next;
1679 elt = qelement->p;
1680 /* NOTE: when both the buffer_mutex and multi_task_context.mutex are
1681 needed, multi_task_context.mutex must be acquired first */
1682 g_mutex_lock (&elt->buffer_mutex);
1683 if (elt->connection_status == GSTCURL_WANT_REMOVAL) {
1684 curl_multi_remove_handle (context->multi_handle, elt->curl_handle);
1685 if (elt->state == GSTCURL_UNLOCK) {
1686 elt->pending_state = GSTCURL_REMOVED;
1687 } else {
1688 elt->state = GSTCURL_REMOVED;
1689 }
1690 elt->connection_status = GSTCURL_NOT_CONNECTED;
1691 gst_curl_http_src_remove_queue_item (&context->queue, qelement->p);
1692 g_cond_signal (&elt->buffer_cond);
1693 } else if (elt->connection_status == GSTCURL_CONNECTED) {
1694 active++;
1695 if (g_atomic_int_compare_and_exchange (&qelement->running, 0, 1)) {
1696 GSTCURL_DEBUG_PRINT ("Adding easy handle for URI %s", qelement->p->uri);
1697 curl_multi_add_handle (context->multi_handle, qelement->p->curl_handle);
1698 }
1699 }
1700 g_mutex_unlock (&elt->buffer_mutex);
1701 qelement = qnext;
1702 }
1703
1704 if (active == 0) {
1705 GSTCURL_DEBUG_PRINT ("No active elements");
1706 goto out;
1707 }
1708
1709 /* perform a select() on all of the active sockets and process any
1710 messages from curl */
1711 {
1712 struct timeval timeout;
1713 gint rc;
1714 fd_set fdread, fdwrite, fdexcep;
1715 int maxfd = -1;
1716 long curl_timeo = -1;
1717 gboolean cond = FALSE;
1718
1719 /* Because curl can possibly take some time here, be nice and let go of the
1720 * mutex so other threads can perform state/queue operations as we don't
1721 * care about those until the end of this. */
1722 g_mutex_unlock (&context->mutex);
1723
1724 FD_ZERO (&fdread);
1725 FD_ZERO (&fdwrite);
1726 FD_ZERO (&fdexcep);
1727
1728 timeout.tv_sec = 1;
1729 timeout.tv_usec = 0;
1730
1731 curl_multi_timeout (context->multi_handle, &curl_timeo);
1732 if (curl_timeo >= 0) {
1733 timeout.tv_sec = curl_timeo / 1000;
1734 if (timeout.tv_sec > 1) {
1735 timeout.tv_sec = 1;
1736 } else {
1737 timeout.tv_usec = (curl_timeo % 1000) * 1000;
1738 }
1739 }
1740
1741 /* get file descriptors from the transfers */
1742 curl_multi_fdset (context->multi_handle, &fdread, &fdwrite, &fdexcep,
1743 &maxfd);
1744
1745 rc = select (maxfd + 1, &fdread, &fdwrite, &fdexcep, &timeout);
1746
1747 switch (rc) {
1748 case -1:
1749 /* select error */
1750 break;
1751 case 0:
1752 default:
1753 /* timeout or readable/writable sockets */
1754 curl_multi_perform (context->multi_handle, &still_running);
1755 break;
1756 }
1757
1758 g_mutex_lock (&context->mutex);
1759
1760 /*
1761 * Check the CURL message buffer to find out if any transfers have
1762 * completed. If they have, call the signal_finished function which
1763 * will signal the g_cond_wait call in that calling instance.
1764 */
1765 i = 0;
1766 while (cond != TRUE) {
1767 curl_message = curl_multi_info_read (context->multi_handle, &i);
1768 if (curl_message == NULL) {
1769 cond = TRUE;
1770 } else if (curl_message->msg == CURLMSG_DONE) {
1771 /* A hack, but I have seen curl_message->easy_handle being
1772 * NULL randomly, so check for that. */
1773 if (curl_message->easy_handle != NULL) {
1774 curl_multi_remove_handle (context->multi_handle,
1775 curl_message->easy_handle);
1776 gst_curl_http_src_remove_queue_handle (&context->queue,
1777 curl_message->easy_handle, curl_message->data.result);
1778 }
1779 }
1780 }
1781 }
1782 out:
1783 g_mutex_unlock (&context->mutex);
1784 }
1785
1786 /*
1787 * Receive headers from the remote server and put them into the http_headers
1788 * structure to be sent downstream when we've got them all and started receiving
1789 * the body (see ::_handle_response())
1790 */
1791 static size_t
gst_curl_http_src_get_header(void * header,size_t size,size_t nmemb,void * src)1792 gst_curl_http_src_get_header (void *header, size_t size, size_t nmemb,
1793 void *src)
1794 {
1795 GstCurlHttpSrc *s = src;
1796 char *substr;
1797
1798 GST_DEBUG_OBJECT (s, "Received header: %s", (char *) header);
1799
1800 g_mutex_lock (&s->buffer_mutex);
1801
1802 if (s->state == GSTCURL_UNLOCK) {
1803 g_mutex_unlock (&s->buffer_mutex);
1804 return size * nmemb;
1805 }
1806
1807 if (s->http_headers == NULL) {
1808 /* Can't do anything here, so just silently swallow the header */
1809 GST_DEBUG_OBJECT (s, "HTTP Headers Structure has already been sent,"
1810 " ignoring header");
1811 g_mutex_unlock (&s->buffer_mutex);
1812 return size * nmemb;
1813 }
1814
1815 substr = gst_curl_http_src_strcasestr (header, "HTTP");
1816 if (substr == header) {
1817 /* We have a status line! */
1818 gchar **status_line_fields;
1819
1820 /* Have we already seen a status line? If so, delete any response headers */
1821 if (s->status_code > 0) {
1822 GstStructure *empty_headers =
1823 gst_structure_new_empty (RESPONSE_HEADERS_NAME);
1824 gst_structure_remove_field (s->http_headers, RESPONSE_HEADERS_NAME);
1825 gst_structure_set (s->http_headers, RESPONSE_HEADERS_NAME,
1826 GST_TYPE_STRUCTURE, empty_headers, NULL);
1827 gst_structure_free (empty_headers);
1828
1829 }
1830
1831 /* Process the status line */
1832 status_line_fields = g_strsplit ((gchar *) header, " ", 3);
1833 if (status_line_fields == NULL) {
1834 GST_ERROR_OBJECT (s, "Status line processing failed!");
1835 } else {
1836 s->status_code =
1837 (guint) g_ascii_strtoll (status_line_fields[1], NULL, 10);
1838 g_free (s->reason_phrase);
1839 s->reason_phrase = g_strdup (status_line_fields[2]);
1840 GST_INFO_OBJECT (s, "Received status %u for request for URI %s: %s",
1841 s->status_code, s->uri, s->reason_phrase);
1842 gst_structure_set (s->http_headers, HTTP_STATUS_CODE,
1843 G_TYPE_UINT, s->status_code, NULL);
1844 g_strfreev (status_line_fields);
1845 }
1846 } else {
1847 /* Normal header line */
1848 gchar **header_tpl = g_strsplit ((gchar *) header, ": ", 2);
1849 if (header_tpl == NULL) {
1850 GST_ERROR_OBJECT (s, "Header processing failed! (%s)", (gchar *) header);
1851 } else {
1852 const GValue *gv_resp_hdrs = gst_structure_get_value (s->http_headers,
1853 RESPONSE_HEADERS_NAME);
1854 const GstStructure *response_headers =
1855 gst_value_get_structure (gv_resp_hdrs);
1856 /* Store header key lower case (g_ascii_strdown), makes searching through
1857 * later on easier - end applications shouldn't care, as all HTTP headers
1858 * are case-insensitive */
1859 gchar *header_key = g_ascii_strdown (header_tpl[0], -1);
1860 gchar *header_value;
1861
1862 /* If header field already exists, append to the end */
1863 if (gst_structure_has_field (response_headers, header_key) == TRUE) {
1864 header_value = g_strdup_printf ("%s, %s",
1865 gst_structure_get_string (response_headers, header_key),
1866 header_tpl[1]);
1867 gst_structure_set ((GstStructure *) response_headers, header_key,
1868 G_TYPE_STRING, header_value, NULL);
1869 g_free (header_value);
1870 } else {
1871 header_value = header_tpl[1];
1872 gst_structure_set ((GstStructure *) response_headers, header_key,
1873 G_TYPE_STRING, header_value, NULL);
1874 }
1875
1876 /* We have some special cases - deal with them here */
1877 if (g_strcmp0 (header_key, "content-type") == 0) {
1878 gst_curl_http_src_negotiate_caps (src);
1879 }
1880
1881 g_free (header_key);
1882 g_strfreev (header_tpl);
1883 }
1884 }
1885
1886 s->hdrs_updated = TRUE;
1887
1888 g_mutex_unlock (&s->buffer_mutex);
1889
1890 return size * nmemb;
1891 }
1892
1893 /*
1894 * My own quick and dirty implementation of strcasestr. This is a GNU extension
1895 * (i.e. not portable) and not always guaranteed to be available.
1896 *
1897 * I know this doesn't work if the haystack and needle are the same size. But
1898 * this isn't necessarily a bad thing, as the only place we currently use this
1899 * is at a point where returning nothing even if a string match occurs but the
1900 * needle is the same size as the haystack actually saves us time.
1901 */
1902 static char *
gst_curl_http_src_strcasestr(const char * haystack,const char * needle)1903 gst_curl_http_src_strcasestr (const char *haystack, const char *needle)
1904 {
1905 int i, j, needle_len;
1906 char *location;
1907
1908 needle_len = (int) strlen (needle);
1909 i = 0;
1910 j = 0;
1911 location = NULL;
1912
1913 while (haystack[i] != '\0') {
1914 if (j == needle_len) {
1915 location = (char *) haystack + (i - j);
1916 }
1917 if (tolower (haystack[i]) == tolower (needle[j])) {
1918 j++;
1919 } else {
1920 j = 0;
1921 }
1922 i++;
1923 }
1924
1925 return location;
1926 }
1927
1928 /*
1929 * Receive chunks of the requested body and pass these back to the ::create()
1930 * loop
1931 */
1932 static size_t
gst_curl_http_src_get_chunks(void * chunk,size_t size,size_t nmemb,void * src)1933 gst_curl_http_src_get_chunks (void *chunk, size_t size, size_t nmemb, void *src)
1934 {
1935 GstCurlHttpSrc *s = src;
1936 size_t chunk_len = size * nmemb;
1937 GST_TRACE_OBJECT (s,
1938 "Received curl chunk for URI %s of size %d", s->uri, (int) chunk_len);
1939 g_mutex_lock (&s->buffer_mutex);
1940 if (s->state == GSTCURL_UNLOCK) {
1941 g_mutex_unlock (&s->buffer_mutex);
1942 return chunk_len;
1943 }
1944 s->buffer =
1945 g_realloc (s->buffer, (s->buffer_len + chunk_len + 1) * sizeof (char));
1946 if (s->buffer == NULL) {
1947 GST_ERROR_OBJECT (s, "Realloc for cURL response message failed!\n");
1948 return 0;
1949 }
1950 memcpy (s->buffer + s->buffer_len, chunk, chunk_len);
1951 s->buffer_len += chunk_len;
1952 g_cond_signal (&s->buffer_cond);
1953 g_mutex_unlock (&s->buffer_mutex);
1954 return chunk_len;
1955 }
1956
1957 /*
1958 * Request a cancellation of a currently running curl handle.
1959 */
1960 static void
gst_curl_http_src_request_remove(GstCurlHttpSrc * src)1961 gst_curl_http_src_request_remove (GstCurlHttpSrc * src)
1962 {
1963 GstCurlHttpSrcClass *klass = G_TYPE_INSTANCE_GET_CLASS (src,
1964 GST_TYPE_CURL_HTTP_SRC,
1965 GstCurlHttpSrcClass);
1966
1967 g_mutex_lock (&klass->multi_task_context.mutex);
1968 g_mutex_lock (&src->buffer_mutex);
1969 if (src->connection_status == GSTCURL_CONNECTED) {
1970 src->connection_status = GSTCURL_WANT_REMOVAL;
1971 }
1972 g_mutex_unlock (&src->buffer_mutex);
1973 g_cond_signal (&klass->multi_task_context.signal);
1974 g_mutex_unlock (&klass->multi_task_context.mutex);
1975 }
1976
1977 /*
1978 * Request a cancellation of a currently running curl handle and
1979 * block this thread until the src element has been removed
1980 * from the queue
1981 */
1982 static void
gst_curl_http_src_wait_until_removed(GstCurlHttpSrc * src)1983 gst_curl_http_src_wait_until_removed (GstCurlHttpSrc * src)
1984 {
1985 gst_curl_http_src_request_remove (src);
1986 g_mutex_lock (&src->buffer_mutex);
1987 while (src->connection_status != GSTCURL_NOT_CONNECTED) {
1988 g_cond_wait (&src->buffer_cond, &src->buffer_mutex);
1989 }
1990 g_mutex_unlock (&src->buffer_mutex);
1991 }
1992