1 /*
2  * GstCurlHttpSrc
3  * Copyright 2017 British Broadcasting Corporation - Research and Development
4  *
5  * Author: Sam Hurst <samuelh@rd.bbc.co.uk>
6  *
7  * Permission is hereby granted, free of charge, to any person obtaining a
8  * copy of this software and associated documentation files (the "Software"),
9  * to deal in the Software without restriction, including without limitation
10  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
11  * and/or sell copies of the Software, and to permit persons to whom the
12  * Software is furnished to do so, subject to the following conditions:
13  *
14  * The above copyright notice and this permission notice shall be included in
15  * all copies or substantial portions of the Software.
16  *
17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
22  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
23  * DEALINGS IN THE SOFTWARE.
24  *
25  * Alternatively, the contents of this file may be used under the
26  * GNU Lesser General Public License Version 2.1 (the "LGPL"), in
27  * which case the following provisions apply instead of the ones
28  * mentioned above:
29  *
30  * This library is free software; you can redistribute it and/or
31  * modify it under the terms of the GNU Library General Public
32  * License as published by the Free Software Foundation; either
33  * version 2 of the License, or (at your option) any later version.
34  *
35  * This library is distributed in the hope that it will be useful,
36  * but WITHOUT ANY WARRANTY; without even the implied warranty of
37  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
38  * Library General Public License for more details.
39  *
40  * You should have received a copy of the GNU Library General Public
41  * License along with this library; if not, write to the
42  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
43  * Boston, MA 02111-1307, USA.
44  */
45 
46 /**
47  * SECTION:element-curlhttpsrc
48  *
49  * This plugin reads data from a remote location specified by a URI, when the
50  * protocol is 'http' or 'https'.
51  *
52  * It is based on the cURL project (http://curl.haxx.se/) and is specifically
53  * designed to be also used with nghttp2 (http://nghttp2.org) to enable HTTP/2
54  * support for GStreamer. Your libcurl library MUST be compiled against nghttp2
55  * for HTTP/2 support for this functionality. HTTPS support is dependent on
56  * cURL being built with SSL support (OpenSSL/PolarSSL/NSS/GnuTLS).
57  *
58  * An HTTP proxy must be specified by URL.
59  * If the "http_proxy" environment variable is set, its value is used.
60  * The #GstCurlHttpSrc:proxy property can be used to override the default.
61  *
62  * <refsect2>
63  * <title>Example launch line</title>
64  * |[
65  * gst-launch-1.0 curlhttpsrc location=http://127.0.1.1/index.html ! fakesink dump=1
66  * ]| The above pipeline reads a web page from the local machine using HTTP and
67  * dumps it to stdout.
68  * |[
69  * gst-launch-1.0 playbin uri=http://rdmedia.bbc.co.uk/dash/testmpds/multiperiod/bbb.php
70  * ]| The above pipeline will start up a DASH streaming session from the given
71  * MPD file. This requires GStreamer to have been built with dashdemux from
72  * gst-plugins-bad.
73  * </refsect2>
74  */
75 
76 /*
77  * Thread safety notes.
78  *
79  * GstCurlHttpSrc uses a single thread running the
80  * gst_curl_http_src_curl_multi_loop() function to handle receiving
81  * data and messages from libcurl. Each instance of GstCurlHttpSrc adds
82  * an entry into a queue in GstCurlHttpSrcMultiTaskContext and waits
83  * for the multi_loop to perform the HTTP request.
84  *
85  * When an instance of GstCurlHttpSrc wants to make a request (i.e.
86  * it has moved to the PLAYING state) it adds itself to the
87  * multi_task_context.queue list and signals the multi_loop task.
88  *
89  * Each instance of GstCurlHttpSrc uses buffer_mutex and buffer_cond
90  * to wait for gst_curl_http_src_curl_multi_loop() to perform the
91  * request and signal completion.
92  *
93  * Each instance of GstCurlHttpSrc is protected by the mutexes:
94  * 1. uri_mutex
95  * 2. buffer_mutex
96  *
97  * uri_mutex is used to protect access to the uri field.
98  *
99  * buffer_mutex is used to protect access to buffer_cond, state and
100  * connection_status.
101  *
102  * The gst_curl_http_src_curl_multi_loop() function uses the mutexes:
103  * 1. multi_task_context.task_rec_mutex
104  * 2. multi_task_context.mutex
105  *
106  * multi_task_context.task_rec_mutex is only used by GstTask.
107  *
108  * multi_task_context.mutex is used to protect access to queue and state
109  *
110  * To avoid deadlock, it is vital that if both multi_task_context.mutex
111  * and buffer_mutex are required, that they are locked in the order:
112  * 1. multi_task_context.mutex
113  * 2. buffer_mutex
114  */
115 
116 #ifdef HAVE_CONFIG_H
117 #include <config.h>
118 #endif
119 
120 #include <gst/gst-i18n-plugin.h>
121 
122 #include "gstcurlhttpsrc.h"
123 #include "gstcurlqueue.h"
124 #include "gstcurldefaults.h"
125 
126 GST_DEBUG_CATEGORY_STATIC (gst_curl_http_src_debug);
127 #define GST_CAT_DEFAULT gst_curl_http_src_debug
128 GST_DEBUG_CATEGORY_STATIC (gst_curl_loop_debug);
129 
130 #define CURL_HTTP_SRC_ERROR(src,cat,code,error_message)     \
131   do { \
132     GST_ELEMENT_ERROR_WITH_DETAILS ((src), cat, code, ("%s", error_message), \
133         ("%s (%d), URL: %s, Redirect to: %s", (src)->reason_phrase, \
134             (src)->status_code, (src)->uri, GST_STR_NULL ((src)->redirect_uri)), \
135             ("http-status-code", G_TYPE_UINT, (src)->status_code, \
136              "http-redirect-uri", G_TYPE_STRING, GST_STR_NULL ((src)->redirect_uri), NULL)); \
137   } while(0)
138 
139 enum
140 {
141   PROP_0,
142   PROP_URI,
143   PROP_USERNAME,
144   PROP_PASSWORD,
145   PROP_PROXYURI,
146   PROP_PROXYUSERNAME,
147   PROP_PROXYPASSWORD,
148   PROP_COOKIES,
149   PROP_USERAGENT,
150   PROP_HEADERS,
151   PROP_COMPRESS,
152   PROP_REDIRECT,
153   PROP_MAXREDIRECT,
154   PROP_KEEPALIVE,
155   PROP_TIMEOUT,
156   PROP_STRICT_SSL,
157   PROP_SSL_CA_FILE,
158   PROP_RETRIES,
159   PROP_CONNECTIONMAXTIME,
160   PROP_MAXCONCURRENT_SERVER,
161   PROP_MAXCONCURRENT_PROXY,
162   PROP_MAXCONCURRENT_GLOBAL,
163   PROP_HTTPVERSION,
164   PROP_IRADIO_MODE,
165   PROP_MAX
166 };
167 
168 /*
169  * Make a source pad template to be able to kick out recv'd data
170  */
171 static GstStaticPadTemplate srcpadtemplate = GST_STATIC_PAD_TEMPLATE ("src",
172     GST_PAD_SRC,
173     GST_PAD_ALWAYS,
174     GST_STATIC_CAPS_ANY);
175 
176 /*
177  * Function Definitions
178  */
179 /* Gstreamer generic element functions */
180 static void gst_curl_http_src_set_property (GObject * object, guint prop_id,
181     const GValue * value, GParamSpec * pspec);
182 static void gst_curl_http_src_get_property (GObject * object, guint prop_id,
183     GValue * value, GParamSpec * pspec);
184 static void gst_curl_http_src_ref_multi (GstCurlHttpSrc * src);
185 static void gst_curl_http_src_unref_multi (GstCurlHttpSrc * src);
186 static void gst_curl_http_src_finalize (GObject * obj);
187 static GstFlowReturn gst_curl_http_src_create (GstPushSrc * psrc,
188     GstBuffer ** outbuf);
189 static GstFlowReturn gst_curl_http_src_handle_response (GstCurlHttpSrc * src);
190 static gboolean gst_curl_http_src_negotiate_caps (GstCurlHttpSrc * src);
191 static GstStateChangeReturn gst_curl_http_src_change_state (GstElement *
192     element, GstStateChange transition);
193 static void gst_curl_http_src_cleanup_instance (GstCurlHttpSrc * src);
194 static gboolean gst_curl_http_src_query (GstBaseSrc * bsrc, GstQuery * query);
195 static gboolean gst_curl_http_src_get_content_length (GstBaseSrc * bsrc,
196     guint64 * size);
197 static gboolean gst_curl_http_src_unlock (GstBaseSrc * bsrc);
198 static gboolean gst_curl_http_src_unlock_stop (GstBaseSrc * bsrc);
199 
200 /* URI Handler functions */
201 static void gst_curl_http_src_uri_handler_init (gpointer g_iface,
202     gpointer iface_data);
203 static guint gst_curl_http_src_urihandler_get_type (GType type);
204 static const gchar *const *gst_curl_http_src_urihandler_get_protocols (GType
205     type);
206 static gchar *gst_curl_http_src_urihandler_get_uri (GstURIHandler * handler);
207 static gboolean gst_curl_http_src_urihandler_set_uri (GstURIHandler * handler,
208     const gchar * uri, GError ** error);
209 
210 /* GstTask functions */
211 static void gst_curl_http_src_curl_multi_loop (gpointer thread_data);
212 static CURL *gst_curl_http_src_create_easy_handle (GstCurlHttpSrc * s);
213 static inline void gst_curl_http_src_destroy_easy_handle (GstCurlHttpSrc * src);
214 static size_t gst_curl_http_src_get_header (void *header, size_t size,
215     size_t nmemb, void *src);
216 static size_t gst_curl_http_src_get_chunks (void *chunk, size_t size,
217     size_t nmemb, void *src);
218 static void gst_curl_http_src_request_remove (GstCurlHttpSrc * src);
219 static void gst_curl_http_src_wait_until_removed (GstCurlHttpSrc * src);
220 static char *gst_curl_http_src_strcasestr (const char *haystack,
221     const char *needle);
222 
223 static curl_version_info_data *gst_curl_http_src_curl_capabilities = NULL;
224 static GstCurlHttpVersion pref_http_ver;
225 
226 #define GST_TYPE_CURL_HTTP_VERSION (gst_curl_http_version_get_type ())
227 
228 static GType
gst_curl_http_version_get_type(void)229 gst_curl_http_version_get_type (void)
230 {
231   static GType gtype = 0;
232 
233   if (!gtype) {
234     static const GEnumValue http_versions[] = {
235       {GSTCURL_HTTP_VERSION_1_0, "HTTP Version 1.0", "1.0"},
236       {GSTCURL_HTTP_VERSION_1_1, "HTTP Version 1.1", "1.1"},
237 #ifdef CURL_VERSION_HTTP2
238       {GSTCURL_HTTP_VERSION_2_0, "HTTP Version 2.0", "2.0"},
239 #endif
240       {0, NULL, NULL}
241     };
242     gtype = g_enum_register_static ("GstCurlHttpVersionType", http_versions);
243   }
244   return gtype;
245 }
246 
247 #define gst_curl_http_src_parent_class parent_class
248 G_DEFINE_TYPE_WITH_CODE (GstCurlHttpSrc, gst_curl_http_src, GST_TYPE_PUSH_SRC,
249     G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER,
250         gst_curl_http_src_uri_handler_init));
251 
252 static void
gst_curl_http_src_class_init(GstCurlHttpSrcClass * klass)253 gst_curl_http_src_class_init (GstCurlHttpSrcClass * klass)
254 {
255   GObjectClass *gobject_class;
256   GstElementClass *gstelement_class;
257   GstBaseSrcClass *gstbasesrc_class;
258   GstPushSrcClass *gstpushsrc_class;
259   const gchar *http_env;
260   GstCurlHttpVersion default_http_version;
261 
262   gobject_class = (GObjectClass *) klass;
263   gstelement_class = (GstElementClass *) klass;
264   gstbasesrc_class = (GstBaseSrcClass *) klass;
265   gstpushsrc_class = (GstPushSrcClass *) klass;
266 
267   GST_DEBUG_CATEGORY_INIT (gst_curl_http_src_debug, "curlhttpsrc",
268       0, "UriHandler for libcURL");
269 
270   GST_INFO_OBJECT (klass, "class_init started!");
271 
272   gstelement_class->change_state =
273       GST_DEBUG_FUNCPTR (gst_curl_http_src_change_state);
274   gstpushsrc_class->create = GST_DEBUG_FUNCPTR (gst_curl_http_src_create);
275   gstbasesrc_class->query = GST_DEBUG_FUNCPTR (gst_curl_http_src_query);
276   gstbasesrc_class->get_size =
277       GST_DEBUG_FUNCPTR (gst_curl_http_src_get_content_length);
278   gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_curl_http_src_unlock);
279   gstbasesrc_class->unlock_stop =
280       GST_DEBUG_FUNCPTR (gst_curl_http_src_unlock_stop);
281 
282   gst_element_class_add_pad_template (gstelement_class,
283       gst_static_pad_template_get (&srcpadtemplate));
284 
285   gst_curl_http_src_curl_capabilities = curl_version_info (CURLVERSION_NOW);
286 #ifdef CURL_VERSION_HTTP2
287   if (gst_curl_http_src_curl_capabilities->features & CURL_VERSION_HTTP2) {
288     default_http_version = GSTCURL_HTTP_VERSION_2_0;
289   } else
290 #endif
291     default_http_version = GSTCURL_HTTP_VERSION_1_1;
292 
293   http_env = g_getenv ("GST_CURL_HTTP_VER");
294   if (http_env != NULL) {
295     GST_INFO_OBJECT (klass, "Seen env var GST_CURL_HTTP_VER with value %s",
296         http_env);
297     if (!strcmp (http_env, "1.0")) {
298       pref_http_ver = GSTCURL_HTTP_VERSION_1_0;
299     } else if (!strcmp (http_env, "1.1")) {
300       pref_http_ver = GSTCURL_HTTP_VERSION_1_1;
301     } else if (!strcmp (http_env, "2.0")) {
302 #ifdef CURL_VERSION_HTTP2
303       if (gst_curl_http_src_curl_capabilities->features & CURL_VERSION_HTTP2) {
304         pref_http_ver = GSTCURL_HTTP_VERSION_2_0;
305       } else {
306         goto unsupported_http_version;
307       }
308 #endif
309     } else {
310     unsupported_http_version:
311       GST_WARNING_OBJECT (klass,
312           "Unsupported HTTP version: %s. Fallback to default", http_env);
313       pref_http_ver = default_http_version;
314     }
315   } else {
316     pref_http_ver = default_http_version;
317   }
318 
319   gobject_class->set_property = gst_curl_http_src_set_property;
320   gobject_class->get_property = gst_curl_http_src_get_property;
321   gobject_class->finalize = gst_curl_http_src_finalize;
322 
323   g_object_class_install_property (gobject_class, PROP_URI,
324       g_param_spec_string ("location", "Location", "URI of resource to read",
325           GSTCURL_HANDLE_DEFAULT_CURLOPT_URL,
326           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
327 
328   g_object_class_install_property (gobject_class, PROP_USERNAME,
329       g_param_spec_string ("user-id", "user-id",
330           "HTTP location URI user id for authentication",
331           GSTCURL_HANDLE_DEFAULT_CURLOPT_USERNAME,
332           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
333 
334   g_object_class_install_property (gobject_class, PROP_PASSWORD,
335       g_param_spec_string ("user-pw", "user-pw",
336           "HTTP location URI password for authentication",
337           GSTCURL_HANDLE_DEFAULT_CURLOPT_PASSWORD,
338           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
339 
340   g_object_class_install_property (gobject_class, PROP_PROXYURI,
341       g_param_spec_string ("proxy", "Proxy", "URI of HTTP proxy server",
342           GSTCURL_HANDLE_DEFAULT_CURLOPT_PROXY,
343           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
344 
345   g_object_class_install_property (gobject_class, PROP_PROXYUSERNAME,
346       g_param_spec_string ("proxy-id", "proxy-id",
347           "HTTP proxy URI user id for authentication",
348           GSTCURL_HANDLE_DEFAULT_CURLOPT_PROXYUSERNAME,
349           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
350 
351   g_object_class_install_property (gobject_class, PROP_PROXYPASSWORD,
352       g_param_spec_string ("proxy-pw", "proxy-pw",
353           "HTTP proxy URI password for authentication",
354           GSTCURL_HANDLE_DEFAULT_CURLOPT_PROXYPASSWORD,
355           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
356 
357   g_object_class_install_property (gobject_class, PROP_COOKIES,
358       g_param_spec_boxed ("cookies", "Cookies", "List of HTTP Cookies",
359           G_TYPE_STRV, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
360 
361   g_object_class_install_property (gobject_class, PROP_USERAGENT,
362       g_param_spec_string ("user-agent", "User-Agent",
363           "URI of resource requested",
364           GSTCURL_HANDLE_DEFAULT_CURLOPT_USERAGENT "/<curl-version>",
365           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
366 
367   g_object_class_install_property (gobject_class, PROP_COMPRESS,
368       g_param_spec_boolean ("compress", "Compress",
369           "Allow compressed content encodings",
370           GSTCURL_HANDLE_DEFAULT_CURLOPT_ACCEPT_ENCODING,
371           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
372 
373   g_object_class_install_property (gobject_class, PROP_REDIRECT,
374       g_param_spec_boolean ("automatic-redirect", "automatic-redirect",
375           "Allow HTTP Redirections (HTTP Status Code 300 series)",
376           GSTCURL_HANDLE_DEFAULT_CURLOPT_FOLLOWLOCATION,
377           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
378 
379   g_object_class_install_property (gobject_class, PROP_MAXREDIRECT,
380       g_param_spec_int ("max-redirect", "Max-Redirect",
381           "Maximum number of permitted redirections. -1 is unlimited.",
382           GSTCURL_HANDLE_MIN_CURLOPT_MAXREDIRS,
383           GSTCURL_HANDLE_MAX_CURLOPT_MAXREDIRS,
384           GSTCURL_HANDLE_DEFAULT_CURLOPT_MAXREDIRS,
385           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
386 
387   g_object_class_install_property (gobject_class, PROP_KEEPALIVE,
388       g_param_spec_boolean ("keep-alive", "Keep-Alive",
389           "Toggle keep-alive for connection reuse.",
390           GSTCURL_HANDLE_DEFAULT_CURLOPT_TCP_KEEPALIVE,
391           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
392 
393   g_object_class_install_property (gobject_class, PROP_TIMEOUT,
394       g_param_spec_int ("timeout", "Timeout",
395           "Value in seconds before timeout a blocking request (0 = no timeout)",
396           GSTCURL_HANDLE_MIN_CURLOPT_TIMEOUT,
397           GSTCURL_HANDLE_MAX_CURLOPT_TIMEOUT,
398           GSTCURL_HANDLE_DEFAULT_CURLOPT_TIMEOUT,
399           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
400 
401   g_object_class_install_property (gobject_class, PROP_HEADERS,
402       g_param_spec_boxed ("extra-headers", "Extra Headers",
403           "Extra headers to append to the HTTP request",
404           GST_TYPE_STRUCTURE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
405 
406   g_object_class_install_property (gobject_class, PROP_STRICT_SSL,
407       g_param_spec_boolean ("ssl-strict", "SSL Strict",
408           "Strict SSL certificate checking",
409           GSTCURL_HANDLE_DEFAULT_CURLOPT_SSL_VERIFYPEER,
410           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
411 
412   g_object_class_install_property (gobject_class, PROP_SSL_CA_FILE,
413       g_param_spec_string ("ssl-ca-file", "SSL CA File",
414           "Location of an SSL CA file to use for checking SSL certificates",
415           GSTCURL_HANDLE_DEFAULT_CURLOPT_CAINFO,
416           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
417 
418   g_object_class_install_property (gobject_class, PROP_RETRIES,
419       g_param_spec_int ("retries", "Retries",
420           "Maximum number of retries until giving up (-1=infinite)",
421           GSTCURL_HANDLE_MIN_RETRIES, GSTCURL_HANDLE_MAX_RETRIES,
422           GSTCURL_HANDLE_DEFAULT_RETRIES,
423           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
424 
425   g_object_class_install_property (gobject_class, PROP_CONNECTIONMAXTIME,
426       g_param_spec_uint ("max-connection-time", "Max-Connection-Time",
427           "Maximum amount of time to keep-alive HTTP connections",
428           GSTCURL_MIN_CONNECTION_TIME, GSTCURL_MAX_CONNECTION_TIME,
429           GSTCURL_DEFAULT_CONNECTION_TIME,
430           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
431 
432   g_object_class_install_property (gobject_class, PROP_MAXCONCURRENT_SERVER,
433       g_param_spec_uint ("max-connections-per-server",
434           "Max-Connections-Per-Server",
435           "Maximum number of connections allowed per server for HTTP/1.x",
436           GSTCURL_MIN_CONNECTIONS_SERVER, GSTCURL_MAX_CONNECTIONS_SERVER,
437           GSTCURL_DEFAULT_CONNECTIONS_SERVER,
438           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
439 
440   g_object_class_install_property (gobject_class, PROP_MAXCONCURRENT_PROXY,
441       g_param_spec_uint ("max-connections-per-proxy",
442           "Max-Connections-Per-Proxy",
443           "Maximum number of concurrent connections allowed per proxy for HTTP/1.x",
444           GSTCURL_MIN_CONNECTIONS_PROXY, GSTCURL_MAX_CONNECTIONS_PROXY,
445           GSTCURL_DEFAULT_CONNECTIONS_PROXY,
446           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
447 
448   g_object_class_install_property (gobject_class, PROP_MAXCONCURRENT_GLOBAL,
449       g_param_spec_uint ("max-connections", "Max-Connections",
450           "Maximum number of concurrent connections allowed for HTTP/1.x",
451           GSTCURL_MIN_CONNECTIONS_GLOBAL, GSTCURL_MAX_CONNECTIONS_GLOBAL,
452           GSTCURL_DEFAULT_CONNECTIONS_GLOBAL,
453           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
454 
455   g_object_class_install_property (gobject_class, PROP_HTTPVERSION,
456       g_param_spec_enum ("http-version", "HTTP-Version",
457           "The preferred HTTP protocol version",
458           GST_TYPE_CURL_HTTP_VERSION, pref_http_ver,
459           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
460 
461   /* Add a debugging task so it's easier to debug in the Multi worker thread */
462   GST_DEBUG_CATEGORY_INIT (gst_curl_loop_debug, "curl_multi_loop", 0,
463       "libcURL loop thread debugging");
464 #ifndef GST_DISABLE_GST_DEBUG
465   gst_debug_log (gst_curl_loop_debug, GST_LEVEL_INFO, __FILE__, __func__,
466       __LINE__, NULL, "Testing the curl_multi_loop debugging prints");
467 #endif
468 
469   klass->multi_task_context.task = NULL;
470   klass->multi_task_context.refcount = 0;
471   klass->multi_task_context.queue = NULL;
472   klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_STOP;
473   klass->multi_task_context.multi_handle = NULL;
474   g_mutex_init (&klass->multi_task_context.mutex);
475   g_cond_init (&klass->multi_task_context.signal);
476 
477   gst_element_class_set_static_metadata (gstelement_class,
478       "HTTP Client Source using libcURL",
479       "Source/Network",
480       "Receiver data as a client over a network via HTTP using cURL",
481       "Sam Hurst <samuelh@rd.bbc.co.uk>");
482 }
483 
484 static void
gst_curl_http_src_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)485 gst_curl_http_src_set_property (GObject * object, guint prop_id,
486     const GValue * value, GParamSpec * pspec)
487 {
488   GstCurlHttpSrc *source = GST_CURLHTTPSRC (object);
489   GSTCURL_FUNCTION_ENTRY (source);
490 
491   switch (prop_id) {
492     case PROP_URI:
493       g_mutex_lock (&source->uri_mutex);
494       g_free (source->uri);
495       source->uri = g_value_dup_string (value);
496       g_mutex_unlock (&source->uri_mutex);
497       break;
498     case PROP_USERNAME:
499       g_free (source->username);
500       source->username = g_value_dup_string (value);
501       break;
502     case PROP_PASSWORD:
503       g_free (source->password);
504       source->password = g_value_dup_string (value);
505       break;
506     case PROP_PROXYURI:
507       g_free (source->proxy_uri);
508       source->proxy_uri = g_value_dup_string (value);
509       break;
510     case PROP_PROXYUSERNAME:
511       g_free (source->proxy_user);
512       source->proxy_user = g_value_dup_string (value);
513       break;
514     case PROP_PROXYPASSWORD:
515       g_free (source->proxy_pass);
516       source->proxy_pass = g_value_dup_string (value);
517       break;
518     case PROP_COOKIES:
519       g_strfreev (source->cookies);
520       source->cookies = g_strdupv (g_value_get_boxed (value));
521       source->number_cookies = g_strv_length (source->cookies);
522       break;
523     case PROP_USERAGENT:
524       g_free (source->user_agent);
525       source->user_agent = g_value_dup_string (value);
526       break;
527     case PROP_HEADERS:
528     {
529       const GstStructure *s = gst_value_get_structure (value);
530       if (source->request_headers)
531         gst_structure_free (source->request_headers);
532       source->request_headers =
533           s ? gst_structure_copy (s) :
534           gst_structure_new_empty (REQUEST_HEADERS_NAME);
535     }
536       break;
537     case PROP_COMPRESS:
538       source->accept_compressed_encodings = g_value_get_boolean (value);
539       break;
540     case PROP_REDIRECT:
541       source->allow_3xx_redirect = g_value_get_boolean (value);
542       break;
543     case PROP_MAXREDIRECT:
544       source->max_3xx_redirects = g_value_get_int (value);
545       break;
546     case PROP_KEEPALIVE:
547       source->keep_alive = g_value_get_boolean (value);
548       break;
549     case PROP_TIMEOUT:
550       source->timeout_secs = g_value_get_int (value);
551       break;
552     case PROP_STRICT_SSL:
553       source->strict_ssl = g_value_get_boolean (value);
554       break;
555     case PROP_SSL_CA_FILE:
556       source->custom_ca_file = g_value_dup_string (value);
557       break;
558     case PROP_RETRIES:
559       source->total_retries = g_value_get_int (value);
560       break;
561     case PROP_CONNECTIONMAXTIME:
562       source->max_connection_time = g_value_get_uint (value);
563       break;
564     case PROP_MAXCONCURRENT_SERVER:
565       source->max_conns_per_server = g_value_get_uint (value);
566       break;
567     case PROP_MAXCONCURRENT_PROXY:
568       source->max_conns_per_proxy = g_value_get_uint (value);
569       break;
570     case PROP_MAXCONCURRENT_GLOBAL:
571       source->max_conns_global = g_value_get_uint (value);
572       break;
573     case PROP_HTTPVERSION:
574       source->preferred_http_version = g_value_get_enum (value);
575       break;
576     default:
577       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
578       break;
579   }
580   GSTCURL_FUNCTION_EXIT (source);
581 }
582 
583 static void
gst_curl_http_src_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)584 gst_curl_http_src_get_property (GObject * object, guint prop_id,
585     GValue * value, GParamSpec * pspec)
586 {
587   GstCurlHttpSrc *source = GST_CURLHTTPSRC (object);
588   GSTCURL_FUNCTION_ENTRY (source);
589 
590   switch (prop_id) {
591     case PROP_URI:
592       g_mutex_lock (&source->uri_mutex);
593       g_value_set_string (value, source->uri);
594       g_mutex_unlock (&source->uri_mutex);
595       break;
596     case PROP_USERNAME:
597       g_value_set_string (value, source->username);
598       break;
599     case PROP_PASSWORD:
600       g_value_set_string (value, source->password);
601       break;
602     case PROP_PROXYURI:
603       g_value_set_string (value, source->proxy_uri);
604       break;
605     case PROP_PROXYUSERNAME:
606       g_value_set_string (value, source->proxy_user);
607       break;
608     case PROP_PROXYPASSWORD:
609       g_value_set_string (value, source->proxy_pass);
610       break;
611     case PROP_COOKIES:
612       g_value_set_boxed (value, source->cookies);
613       break;
614     case PROP_USERAGENT:
615       g_value_set_string (value, source->user_agent);
616       break;
617     case PROP_HEADERS:
618       gst_value_set_structure (value, source->request_headers);
619       break;
620     case PROP_COMPRESS:
621       g_value_set_boolean (value, source->accept_compressed_encodings);
622       break;
623     case PROP_REDIRECT:
624       g_value_set_boolean (value, source->allow_3xx_redirect);
625       break;
626     case PROP_MAXREDIRECT:
627       g_value_set_int (value, source->max_3xx_redirects);
628       break;
629     case PROP_KEEPALIVE:
630       g_value_set_boolean (value, source->keep_alive);
631       break;
632     case PROP_TIMEOUT:
633       g_value_set_int (value, source->timeout_secs);
634       break;
635     case PROP_STRICT_SSL:
636       g_value_set_boolean (value, source->strict_ssl);
637       break;
638     case PROP_SSL_CA_FILE:
639       g_value_set_string (value, source->custom_ca_file);
640       break;
641     case PROP_RETRIES:
642       g_value_set_int (value, source->total_retries);
643       break;
644     case PROP_CONNECTIONMAXTIME:
645       g_value_set_uint (value, source->max_connection_time);
646       break;
647     case PROP_MAXCONCURRENT_SERVER:
648       g_value_set_uint (value, source->max_conns_per_server);
649       break;
650     case PROP_MAXCONCURRENT_PROXY:
651       g_value_set_uint (value, source->max_conns_per_proxy);
652       break;
653     case PROP_MAXCONCURRENT_GLOBAL:
654       g_value_set_uint (value, source->max_conns_global);
655       break;
656     case PROP_HTTPVERSION:
657       g_value_set_enum (value, source->preferred_http_version);
658       break;
659     default:
660       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
661       break;
662   }
663   GSTCURL_FUNCTION_EXIT (source);
664 }
665 
666 static void
gst_curl_http_src_init(GstCurlHttpSrc * source)667 gst_curl_http_src_init (GstCurlHttpSrc * source)
668 {
669   GSTCURL_FUNCTION_ENTRY (source);
670 
671   /* Assume everything is already free'd */
672   source->uri = NULL;
673   source->redirect_uri = NULL;
674   source->username = GSTCURL_HANDLE_DEFAULT_CURLOPT_USERNAME;
675   source->password = GSTCURL_HANDLE_DEFAULT_CURLOPT_PASSWORD;
676   source->proxy_uri = NULL;
677   source->proxy_user = NULL;
678   source->proxy_pass = NULL;
679   source->cookies = NULL;
680   g_assert (gst_curl_http_src_curl_capabilities != NULL);
681   source->user_agent =
682       g_strdup_printf (GSTCURL_HANDLE_DEFAULT_CURLOPT_USERAGENT "/%s",
683       gst_curl_http_src_curl_capabilities->version);
684   source->number_cookies = 0;
685   source->request_headers = gst_structure_new_empty (REQUEST_HEADERS_NAME);
686   source->allow_3xx_redirect = GSTCURL_HANDLE_DEFAULT_CURLOPT_FOLLOWLOCATION;
687   source->max_3xx_redirects = GSTCURL_HANDLE_DEFAULT_CURLOPT_MAXREDIRS;
688   source->keep_alive = GSTCURL_HANDLE_DEFAULT_CURLOPT_TCP_KEEPALIVE;
689   source->timeout_secs = GSTCURL_HANDLE_DEFAULT_CURLOPT_TIMEOUT;
690   source->max_connection_time = GSTCURL_DEFAULT_CONNECTION_TIME;
691   source->max_conns_per_server = GSTCURL_DEFAULT_CONNECTIONS_SERVER;
692   source->max_conns_per_proxy = GSTCURL_DEFAULT_CONNECTIONS_PROXY;
693   source->max_conns_global = GSTCURL_DEFAULT_CONNECTIONS_GLOBAL;
694   source->strict_ssl = GSTCURL_HANDLE_DEFAULT_CURLOPT_SSL_VERIFYPEER;
695   source->custom_ca_file = NULL;
696   source->preferred_http_version = pref_http_ver;
697   source->total_retries = GSTCURL_HANDLE_DEFAULT_RETRIES;
698   source->retries_remaining = source->total_retries;
699   source->slist = NULL;
700   source->accept_compressed_encodings = FALSE;
701 
702   gst_base_src_set_automatic_eos (GST_BASE_SRC (source), FALSE);
703 
704   source->proxy_uri = g_strdup (g_getenv ("http_proxy"));
705   source->no_proxy_list = g_strdup (g_getenv ("no_proxy"));
706 
707   g_mutex_init (&source->uri_mutex);
708   g_mutex_init (&source->buffer_mutex);
709   g_cond_init (&source->buffer_cond);
710 
711   source->buffer = NULL;
712   source->buffer_len = 0;
713   source->state = GSTCURL_NONE;
714   source->pending_state = GSTCURL_NONE;
715   source->transfer_begun = FALSE;
716   source->data_received = FALSE;
717   source->connection_status = GSTCURL_NOT_CONNECTED;
718 
719   source->http_headers = NULL;
720   source->content_type = NULL;
721   source->status_code = 0;
722   source->reason_phrase = NULL;
723   source->hdrs_updated = FALSE;
724   source->curl_result = CURLE_OK;
725   gst_caps_replace (&source->caps, NULL);
726 
727   GSTCURL_FUNCTION_EXIT (source);
728 }
729 
730 /*
731  * Check if the Curl multi loop has been started. If not, initialise it and
732  * start it running. If it is already running, increment the refcount.
733  */
734 static void
gst_curl_http_src_ref_multi(GstCurlHttpSrc * src)735 gst_curl_http_src_ref_multi (GstCurlHttpSrc * src)
736 {
737   GstCurlHttpSrcClass *klass;
738 
739   GSTCURL_FUNCTION_ENTRY (src);
740 
741   /*klass = (GstCurlHttpSrcClass) g_type_class_peek_parent (src); */
742   klass = G_TYPE_INSTANCE_GET_CLASS (src, GST_TYPE_CURL_HTTP_SRC,
743       GstCurlHttpSrcClass);
744 
745   g_mutex_lock (&klass->multi_task_context.mutex);
746   if (klass->multi_task_context.refcount == 0) {
747     /* Set up various in-task properties */
748 
749     /* NULL is treated as the start of the list, no need to allocate. */
750     klass->multi_task_context.queue = NULL;
751 
752     /* set up curl */
753     klass->multi_task_context.multi_handle = curl_multi_init ();
754 
755     curl_multi_setopt (klass->multi_task_context.multi_handle,
756         CURLMOPT_PIPELINING, 1);
757 #ifdef CURLMOPT_MAX_HOST_CONNECTIONS
758     curl_multi_setopt (klass->multi_task_context.multi_handle,
759         CURLMOPT_MAX_HOST_CONNECTIONS, 1);
760 #endif
761 
762     /* Start the thread */
763     g_rec_mutex_init (&klass->multi_task_context.task_rec_mutex);
764     klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_RUNNING;
765     klass->multi_task_context.task = gst_task_new (
766         (GstTaskFunction) gst_curl_http_src_curl_multi_loop,
767         (gpointer) & klass->multi_task_context, NULL);
768     gst_task_set_lock (klass->multi_task_context.task,
769         &klass->multi_task_context.task_rec_mutex);
770     if (gst_task_start (klass->multi_task_context.task) == FALSE) {
771       /*
772        * This is a pretty critical failure and is not recoverable, so commit
773        * sudoku and run away.
774        */
775       GSTCURL_ERROR_PRINT ("Couldn't start curl_multi task! Aborting.");
776       abort ();
777     }
778     GSTCURL_INFO_PRINT ("Curl multi loop has been correctly initialised!");
779   }
780   klass->multi_task_context.refcount++;
781   g_mutex_unlock (&klass->multi_task_context.mutex);
782 
783   GSTCURL_FUNCTION_EXIT (src);
784 }
785 
786 /*
787  * Decrement the reference count on the curl multi loop. If this is called by
788  * the last instance to hold a reference, shut down the worker. (Otherwise
789  * GStreamer can't close down with a thread still running). Also offers the
790  * "force_all" boolean parameter, which if TRUE removes all references and shuts
791  * down.
792  */
793 static void
gst_curl_http_src_unref_multi(GstCurlHttpSrc * src)794 gst_curl_http_src_unref_multi (GstCurlHttpSrc * src)
795 {
796   GstCurlHttpSrcClass *klass;
797 
798   GSTCURL_FUNCTION_ENTRY (src);
799 
800   klass = G_TYPE_INSTANCE_GET_CLASS (src, GST_TYPE_CURL_HTTP_SRC,
801       GstCurlHttpSrcClass);
802 
803   g_mutex_lock (&klass->multi_task_context.mutex);
804   klass->multi_task_context.refcount--;
805   GST_INFO_OBJECT (src, "Closing instance, worker thread refcount is now %u",
806       klass->multi_task_context.refcount);
807 
808   if (klass->multi_task_context.refcount == 0) {
809     /* Everything's done! Clean up. */
810     gst_task_stop (klass->multi_task_context.task);
811     klass->multi_task_context.state = GSTCURL_MULTI_LOOP_STATE_STOP;
812     g_cond_signal (&klass->multi_task_context.signal);
813     g_mutex_unlock (&klass->multi_task_context.mutex);
814     GST_DEBUG_OBJECT (src, "Joining curl_multi_loop task...");
815     gst_task_join (klass->multi_task_context.task);
816     gst_object_unref (klass->multi_task_context.task);
817     klass->multi_task_context.task = NULL;
818     curl_multi_cleanup (klass->multi_task_context.multi_handle);
819     klass->multi_task_context.multi_handle = NULL;
820     g_rec_mutex_clear (&klass->multi_task_context.task_rec_mutex);
821     GST_DEBUG_OBJECT (src, "multi_task_context cleanup complete");
822   } else {
823     g_mutex_unlock (&klass->multi_task_context.mutex);
824   }
825 
826   GSTCURL_FUNCTION_EXIT (src);
827 }
828 
829 static void
gst_curl_http_src_finalize(GObject * obj)830 gst_curl_http_src_finalize (GObject * obj)
831 {
832   GstCurlHttpSrc *src = GST_CURLHTTPSRC (obj);
833 
834   GSTCURL_FUNCTION_ENTRY (src);
835 
836   /* Cleanup all memory allocated */
837   gst_curl_http_src_cleanup_instance (src);
838 
839   GSTCURL_FUNCTION_EXIT (src);
840 
841   /* Chain up to parent class */
842   G_OBJECT_CLASS (gst_curl_http_src_parent_class)->finalize (obj);
843 }
844 
845 /*
846  * Do the transfer. If the transfer hasn't begun yet, start a new curl handle
847  * and pass it to the multi queue to be operated on. Then wait for any blocks
848  * of data and push them to the source pad.
849  */
850 static GstFlowReturn
gst_curl_http_src_create(GstPushSrc * psrc,GstBuffer ** outbuf)851 gst_curl_http_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
852 {
853   GstFlowReturn ret;
854   GstCurlHttpSrc *src = GST_CURLHTTPSRC (psrc);
855   GstCurlHttpSrcClass *klass;
856   GstStructure *empty_headers;
857   GstBaseSrc *basesrc;
858 
859   GSTCURL_FUNCTION_ENTRY (src);
860 
861   klass = G_TYPE_INSTANCE_GET_CLASS (src, GST_TYPE_CURL_HTTP_SRC,
862       GstCurlHttpSrcClass);
863   basesrc = GST_BASE_SRC_CAST (src);
864 
865 retry:
866   ret = GST_FLOW_OK;
867   /* NOTE: when both the buffer_mutex and multi_task_context.mutex are
868      needed, multi_task_context.mutex must be acquired first */
869   g_mutex_lock (&klass->multi_task_context.mutex);
870   g_mutex_lock (&src->buffer_mutex);
871   if (src->state == GSTCURL_UNLOCK) {
872     ret = GST_FLOW_FLUSHING;
873     goto escape;
874   }
875 
876   if (!src->transfer_begun) {
877     GST_DEBUG_OBJECT (src, "Starting new request for URI %s", src->uri);
878     /* Create the Easy Handle and set up the session. */
879     src->curl_handle = gst_curl_http_src_create_easy_handle (src);
880     if (src->curl_handle == NULL) {
881       ret = GST_FLOW_ERROR;
882       goto escape;
883     }
884 
885     if (gst_curl_http_src_add_queue_item (&klass->multi_task_context.queue, src)
886         == FALSE) {
887       GST_ERROR_OBJECT (src, "Couldn't create new queue item! Aborting...");
888       ret = GST_FLOW_ERROR;
889       goto escape;
890     }
891     /* Signal the worker thread */
892     g_cond_signal (&klass->multi_task_context.signal);
893 
894     src->state = GSTCURL_OK;
895     src->transfer_begun = TRUE;
896     src->data_received = FALSE;
897 
898     GST_DEBUG_OBJECT (src, "Submitted request for URI %s to curl", src->uri);
899 
900     if (src->http_headers != NULL) {
901       gst_structure_free (src->http_headers);
902     }
903     empty_headers = gst_structure_new_empty (RESPONSE_HEADERS_NAME);
904     src->http_headers = gst_structure_new (HTTP_HEADERS_NAME,
905         URI_NAME, G_TYPE_STRING, src->uri,
906         REQUEST_HEADERS_NAME, GST_TYPE_STRUCTURE, src->request_headers,
907         RESPONSE_HEADERS_NAME, GST_TYPE_STRUCTURE, empty_headers, NULL);
908     gst_structure_free (empty_headers);
909     GST_INFO_OBJECT (src, "Created a new headers object");
910   }
911 
912   g_mutex_unlock (&klass->multi_task_context.mutex);
913 
914   /* Wait for data to become available, then punt it downstream */
915   while ((src->buffer_len == 0) && (src->state == GSTCURL_OK)
916       && (src->connection_status == GSTCURL_CONNECTED)) {
917     g_cond_wait (&src->buffer_cond, &src->buffer_mutex);
918   }
919 
920   if (src->state == GSTCURL_UNLOCK) {
921     if (src->buffer_len > 0) {
922       g_free (src->buffer);
923       src->buffer = NULL;
924       src->buffer_len = 0;
925     }
926     g_mutex_unlock (&src->buffer_mutex);
927     return GST_FLOW_FLUSHING;
928   }
929 
930   ret = gst_curl_http_src_handle_response (src);
931   switch (ret) {
932     case GST_FLOW_ERROR:
933       /* Don't attempt a retry, just bomb out */
934       g_mutex_unlock (&src->buffer_mutex);
935       return ret;
936     case GST_FLOW_CUSTOM_ERROR:
937       if (src->data_received == TRUE) {
938         /*
939          * If data has already been received, we can't recall previously sent
940          * buffers so don't attempt a retry in this case.
941          *
942          * TODO: Remember the position we got to, and make a range request for
943          * the resource without the bit we've already received?
944          */
945         GST_WARNING_OBJECT (src,
946             "Failed mid-transfer, can't continue for URI %s", src->uri);
947         g_mutex_unlock (&src->buffer_mutex);
948         return GST_FLOW_ERROR;
949       }
950       src->retries_remaining--;
951       if (src->retries_remaining == 0) {
952         GST_WARNING_OBJECT (src, "Out of retries for URI %s", src->uri);
953         g_mutex_unlock (&src->buffer_mutex);
954         return GST_FLOW_ERROR;  /* Don't attempt a retry, just bomb out */
955       }
956       GST_INFO_OBJECT (src, "Attempting retry for URI %s", src->uri);
957       src->state = GSTCURL_NONE;
958       src->transfer_begun = FALSE;
959       src->status_code = 0;
960       g_free (src->reason_phrase);
961       src->reason_phrase = NULL;
962       src->hdrs_updated = FALSE;
963       if (src->http_headers != NULL) {
964         gst_structure_free (src->http_headers);
965         src->http_headers = NULL;
966         GST_INFO_OBJECT (src, "NULL'd the headers");
967       }
968       gst_curl_http_src_destroy_easy_handle (src);
969       g_mutex_unlock (&src->buffer_mutex);
970       goto retry;               /* Attempt a retry! */
971     default:
972       break;
973   }
974 
975   if (((src->state == GSTCURL_OK) || (src->state == GSTCURL_DONE)) &&
976       (src->buffer_len > 0)) {
977 
978     GST_DEBUG_OBJECT (src, "Pushing %u bytes of transfer for URI %s to pad",
979         src->buffer_len, src->uri);
980     *outbuf = gst_buffer_new_allocate (NULL, src->buffer_len, NULL);
981     gst_buffer_fill (*outbuf, 0, src->buffer, src->buffer_len);
982     GST_BUFFER_OFFSET (*outbuf) = basesrc->segment.position;
983 
984     g_free (src->buffer);
985     src->buffer = NULL;
986     src->buffer_len = 0;
987     src->data_received = TRUE;
988 
989     /* ret should still be GST_FLOW_OK */
990   } else if ((src->state == GSTCURL_DONE) && (src->buffer_len == 0)) {
991     GST_INFO_OBJECT (src, "Full body received, signalling EOS for URI %s.",
992         src->uri);
993     src->state = GSTCURL_NONE;
994     src->transfer_begun = FALSE;
995     src->status_code = 0;
996     g_free (src->reason_phrase);
997     src->reason_phrase = NULL;
998     src->hdrs_updated = FALSE;
999     gst_curl_http_src_destroy_easy_handle (src);
1000     ret = GST_FLOW_EOS;
1001   } else {
1002     switch (src->state) {
1003       case GSTCURL_NONE:
1004         GST_WARNING_OBJECT (src, "Got unexpected GSTCURL_NONE state!");
1005         break;
1006       case GSTCURL_REMOVED:
1007         GST_WARNING_OBJECT (src, "Transfer got removed from the curl queue");
1008         ret = GST_FLOW_EOS;
1009         break;
1010       case GSTCURL_BAD_QUEUE_REQUEST:
1011         GST_ERROR_OBJECT (src, "Bad Queue Request!");
1012         ret = GST_FLOW_ERROR;
1013         break;
1014       case GSTCURL_TOTAL_ERROR:
1015         GST_ERROR_OBJECT (src, "Critical, unrecoverable error!");
1016         ret = GST_FLOW_ERROR;
1017         break;
1018       case GSTCURL_PIPELINE_NULL:
1019         GST_ERROR_OBJECT (src, "Pipeline null");
1020         break;
1021       default:
1022         GST_ERROR_OBJECT (src, "Unknown state of %u", src->state);
1023     }
1024   }
1025   g_mutex_unlock (&src->buffer_mutex);
1026   GSTCURL_FUNCTION_EXIT (src);
1027   return ret;
1028 
1029 escape:
1030   g_mutex_unlock (&src->buffer_mutex);
1031   g_mutex_unlock (&klass->multi_task_context.mutex);
1032 
1033   GSTCURL_FUNCTION_EXIT (src);
1034   return ret;
1035 }
1036 
1037 /*
1038  * Convert header from a GstStructure type to a curl_slist type that curl will
1039  * understand.
1040  */
1041 static gboolean
_headers_to_curl_slist(GQuark field_id,const GValue * value,gpointer ptr)1042 _headers_to_curl_slist (GQuark field_id, const GValue * value, gpointer ptr)
1043 {
1044   gchar *field;
1045   struct curl_slist **p_slist = ptr;
1046 
1047   field = g_strdup_printf ("%s: %s", g_quark_to_string (field_id),
1048       g_value_get_string (value));
1049 
1050   *p_slist = curl_slist_append (*p_slist, field);
1051 
1052   g_free (field);
1053 
1054   return TRUE;
1055 }
1056 
1057 /*
1058  * From the data in the queue element s, create a CURL easy handle and populate
1059  * options with the URL, proxy data, login options, cookies,
1060  */
1061 static CURL *
gst_curl_http_src_create_easy_handle(GstCurlHttpSrc * s)1062 gst_curl_http_src_create_easy_handle (GstCurlHttpSrc * s)
1063 {
1064   CURL *handle;
1065   gint i;
1066   GSTCURL_FUNCTION_ENTRY (s);
1067 
1068   /* This is mandatory and yet not default option, so if this is NULL
1069    * then something very bad is going on. */
1070   if (s->uri == NULL) {
1071     GST_ERROR_OBJECT (s, "No URI for curl!");
1072     return NULL;
1073   }
1074 
1075   handle = curl_easy_init ();
1076   if (handle == NULL) {
1077     GST_ERROR_OBJECT (s, "Couldn't init a curl easy handle!");
1078     return NULL;
1079   }
1080   GST_INFO_OBJECT (s, "Creating a new handle for URI %s", s->uri);
1081 
1082   gst_curl_setopt_str (s, handle, CURLOPT_URL, s->uri);
1083   gst_curl_setopt_str (s, handle, CURLOPT_USERNAME, s->username);
1084   gst_curl_setopt_str (s, handle, CURLOPT_PASSWORD, s->password);
1085   gst_curl_setopt_str (s, handle, CURLOPT_PROXY, s->proxy_uri);
1086   gst_curl_setopt_str (s, handle, CURLOPT_NOPROXY, s->no_proxy_list);
1087   gst_curl_setopt_str (s, handle, CURLOPT_PROXYUSERNAME, s->proxy_user);
1088   gst_curl_setopt_str (s, handle, CURLOPT_PROXYPASSWORD, s->proxy_pass);
1089 
1090   for (i = 0; i < s->number_cookies; i++) {
1091     gst_curl_setopt_str (s, handle, CURLOPT_COOKIELIST, s->cookies[i]);
1092   }
1093 
1094   /* curl_slist_append dynamically allocates memory, but I need to free it */
1095   if (s->request_headers != NULL) {
1096     gst_structure_foreach (s->request_headers, _headers_to_curl_slist,
1097         &s->slist);
1098     if (curl_easy_setopt (handle, CURLOPT_HTTPHEADER, s->slist) != CURLE_OK) {
1099       GST_WARNING_OBJECT (s, "Failed to set HTTP headers!");
1100     }
1101   }
1102 
1103   gst_curl_setopt_str_default (s, handle, CURLOPT_USERAGENT, s->user_agent);
1104 
1105   /*
1106    * Unlike soup, this isn't a binary op, curl wants a string here. So if it's
1107    * TRUE, simply set the value as an empty string as this allows both gzip and
1108    * zlib compression methods.
1109    */
1110   if (s->accept_compressed_encodings == TRUE) {
1111     gst_curl_setopt_str (s, handle, CURLOPT_ACCEPT_ENCODING, "");
1112   } else {
1113     gst_curl_setopt_str (s, handle, CURLOPT_ACCEPT_ENCODING, "identity");
1114   }
1115 
1116   gst_curl_setopt_int (s, handle, CURLOPT_FOLLOWLOCATION,
1117       s->allow_3xx_redirect);
1118   gst_curl_setopt_int_default (s, handle, CURLOPT_MAXREDIRS,
1119       s->max_3xx_redirects);
1120   gst_curl_setopt_bool (s, handle, CURLOPT_TCP_KEEPALIVE, s->keep_alive);
1121   gst_curl_setopt_int (s, handle, CURLOPT_TIMEOUT, s->timeout_secs);
1122   gst_curl_setopt_bool (s, handle, CURLOPT_SSL_VERIFYPEER, s->strict_ssl);
1123   gst_curl_setopt_str (s, handle, CURLOPT_CAINFO, s->custom_ca_file);
1124 
1125   switch (s->preferred_http_version) {
1126     case GSTCURL_HTTP_VERSION_1_0:
1127       GST_DEBUG_OBJECT (s, "Setting version as HTTP/1.0");
1128       gst_curl_setopt_int (s, handle, CURLOPT_HTTP_VERSION,
1129           CURL_HTTP_VERSION_1_0);
1130       break;
1131     case GSTCURL_HTTP_VERSION_1_1:
1132       GST_DEBUG_OBJECT (s, "Setting version as HTTP/1.1");
1133       gst_curl_setopt_int (s, handle, CURLOPT_HTTP_VERSION,
1134           CURL_HTTP_VERSION_1_1);
1135       break;
1136 #ifdef CURL_VERSION_HTTP2
1137     case GSTCURL_HTTP_VERSION_2_0:
1138       GST_DEBUG_OBJECT (s, "Setting version as HTTP/2.0");
1139       if (curl_easy_setopt (handle, CURLOPT_HTTP_VERSION,
1140               CURL_HTTP_VERSION_2_0) != CURLE_OK) {
1141         if (gst_curl_http_src_curl_capabilities->features & CURL_VERSION_HTTP2) {
1142           GST_WARNING_OBJECT (s,
1143               "Cannot set unsupported option CURLOPT_HTTP_VERSION");
1144         } else {
1145           GST_INFO_OBJECT (s, "HTTP/2 unsupported by libcurl at this time");
1146         }
1147       }
1148       break;
1149 #endif
1150     default:
1151       GST_WARNING_OBJECT (s,
1152           "Supplied a bogus HTTP version, using curl default!");
1153   }
1154 
1155   gst_curl_setopt_generic (s, handle, CURLOPT_HEADERFUNCTION,
1156       gst_curl_http_src_get_header);
1157   gst_curl_setopt_str (s, handle, CURLOPT_HEADERDATA, s);
1158   gst_curl_setopt_generic (s, handle, CURLOPT_WRITEFUNCTION,
1159       gst_curl_http_src_get_chunks);
1160   gst_curl_setopt_str (s, handle, CURLOPT_WRITEDATA, s);
1161 
1162   gst_curl_setopt_str (s, handle, CURLOPT_ERRORBUFFER, s->curl_errbuf);
1163 
1164   GSTCURL_FUNCTION_EXIT (s);
1165   return handle;
1166 }
1167 
1168 /*
1169  * Check the return type from the curl transfer. If it was okay, then deal with
1170  * any headers that were received. Headers should only be dealt with once - but
1171  * we might get a second set if there are trailing headers (RFC7230 Section 4.4)
1172  */
1173 static GstFlowReturn
gst_curl_http_src_handle_response(GstCurlHttpSrc * src)1174 gst_curl_http_src_handle_response (GstCurlHttpSrc * src)
1175 {
1176   glong curl_info_long;
1177   gdouble curl_info_dbl;
1178   gchar *redirect_url;
1179   GstBaseSrc *basesrc;
1180   const GValue *response_headers;
1181   GstFlowReturn ret = GST_FLOW_OK;
1182 
1183   GSTCURL_FUNCTION_ENTRY (src);
1184 
1185   GST_TRACE_OBJECT (src, "status code: %d (%s), curl return code %d",
1186       src->status_code, src->reason_phrase, src->curl_result);
1187 
1188   /* Check the curl result code first - anything not 0 is probably a failure */
1189   if (src->curl_result != 0) {
1190     GST_WARNING_OBJECT (src, "Curl failed the transfer (%d): %s",
1191         src->curl_result, curl_easy_strerror (src->curl_result));
1192     GST_DEBUG_OBJECT (src, "Reason for curl failure: %s", src->curl_errbuf);
1193     return GST_FLOW_ERROR;
1194   }
1195 
1196   /*
1197    * What response code do we have?
1198    */
1199   if (src->status_code >= 400) {
1200     GST_WARNING_OBJECT (src, "Transfer for URI %s returned error status %u",
1201         src->uri, src->status_code);
1202     src->retries_remaining = 0;
1203     CURL_HTTP_SRC_ERROR (src, RESOURCE, NOT_FOUND, (src->reason_phrase));
1204     return GST_FLOW_ERROR;
1205   } else if (src->status_code == 0) {
1206     if (curl_easy_getinfo (src->curl_handle, CURLINFO_TOTAL_TIME,
1207             &curl_info_dbl) != CURLE_OK) {
1208       /* Curl cannot be relied on in this state, so return an error. */
1209       return GST_FLOW_ERROR;
1210     }
1211     if (curl_info_dbl > src->timeout_secs) {
1212       return GST_FLOW_CUSTOM_ERROR;
1213     }
1214 
1215     if (curl_easy_getinfo (src->curl_handle, CURLINFO_OS_ERRNO,
1216             &curl_info_long) != CURLE_OK) {
1217       /* Curl cannot be relied on in this state, so return an error. */
1218       return GST_FLOW_ERROR;
1219 
1220     }
1221 
1222     GST_WARNING_OBJECT (src, "Errno for CONNECT call was %ld (%s)",
1223         curl_info_long, g_strerror ((gint) curl_info_long));
1224 
1225     /* Some of these responses are retry-able, others not. Set the returned
1226      * state to ERROR so we crash out instead of fruitlessly retrying.
1227      */
1228     if (curl_info_long == ECONNREFUSED) {
1229       return GST_FLOW_ERROR;
1230     }
1231     ret = GST_FLOW_CUSTOM_ERROR;
1232   }
1233 
1234 
1235   if (ret == GST_FLOW_CUSTOM_ERROR) {
1236     src->hdrs_updated = FALSE;
1237     GSTCURL_FUNCTION_EXIT (src);
1238     return ret;
1239   }
1240 
1241   /* Only do this once */
1242   if (src->hdrs_updated == FALSE) {
1243     GSTCURL_FUNCTION_EXIT (src);
1244     return GST_FLOW_OK;
1245   }
1246 
1247   /*
1248    * Deal with redirections...
1249    */
1250   if (curl_easy_getinfo (src->curl_handle, CURLINFO_EFFECTIVE_URL,
1251           &redirect_url)
1252       == CURLE_OK) {
1253     size_t lena, lenb;
1254     lena = strlen (src->uri);
1255     lenb = strlen (redirect_url);
1256     if (g_ascii_strncasecmp (src->uri, redirect_url,
1257             (lena > lenb) ? lenb : lena) != 0) {
1258       GST_INFO_OBJECT (src, "Got a redirect to %s, setting as redirect URI",
1259           redirect_url);
1260       src->redirect_uri = g_strdup (redirect_url);
1261       gst_structure_remove_field (src->http_headers, REDIRECT_URI_NAME);
1262       gst_structure_set (src->http_headers, REDIRECT_URI_NAME,
1263           G_TYPE_STRING, redirect_url, NULL);
1264     }
1265   }
1266 
1267   /*
1268    * Push the content length
1269    */
1270   if (curl_easy_getinfo (src->curl_handle, CURLINFO_CONTENT_LENGTH_DOWNLOAD,
1271           &curl_info_dbl) == CURLE_OK) {
1272     if (curl_info_dbl == -1) {
1273       GST_WARNING_OBJECT (src,
1274           "No Content-Length was specified in the response.");
1275     } else {
1276       GST_INFO_OBJECT (src, "Content-Length was given as %.0f", curl_info_dbl);
1277       basesrc = GST_BASE_SRC_CAST (src);
1278       basesrc->segment.duration = curl_info_dbl;
1279       gst_element_post_message (GST_ELEMENT (src),
1280           gst_message_new_duration_changed (GST_OBJECT (src)));
1281     }
1282   }
1283 
1284   /*
1285    * Push all the received headers down via a sicky event
1286    */
1287   response_headers = gst_structure_get_value (src->http_headers,
1288       RESPONSE_HEADERS_NAME);
1289   if (gst_structure_n_fields (gst_value_get_structure (response_headers)) > 0) {
1290     GstEvent *hdrs_event;
1291 
1292     gst_element_post_message (GST_ELEMENT_CAST (src),
1293         gst_message_new_element (GST_OBJECT_CAST (src),
1294             gst_structure_copy (src->http_headers)));
1295 
1296     /* gst_event_new_custom takes ownership of our structure */
1297     hdrs_event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM_STICKY,
1298         gst_structure_copy (src->http_headers));
1299     gst_pad_push_event (GST_BASE_SRC_PAD (src), hdrs_event);
1300     GST_INFO_OBJECT (src, "Pushed headers downstream");
1301   }
1302 
1303   src->hdrs_updated = FALSE;
1304 
1305   GSTCURL_FUNCTION_EXIT (src);
1306 
1307   return ret;
1308 }
1309 
1310 /*
1311  * "Negotiate" capabilities between us and the sink.
1312  * I.e. tell the sink device what data to expect. We can't be told what to send
1313  * unless we implement "only return to me if this type" property. Potential TODO
1314  */
1315 static gboolean
gst_curl_http_src_negotiate_caps(GstCurlHttpSrc * src)1316 gst_curl_http_src_negotiate_caps (GstCurlHttpSrc * src)
1317 {
1318   const GValue *response_headers;
1319   const GstStructure *response_struct;
1320 
1321   GST_INFO_OBJECT (src, "Negotiating caps...");
1322   if (src->caps && src->http_headers) {
1323     response_headers =
1324         gst_structure_get_value (src->http_headers, RESPONSE_HEADERS_NAME);
1325     if (!response_headers) {
1326       GST_WARNING_OBJECT (src, "Failed to get %s", RESPONSE_HEADERS_NAME);
1327       return FALSE;
1328     }
1329     response_struct = gst_value_get_structure (response_headers);
1330     if (gst_structure_has_field_typed (response_struct, "content-type",
1331             G_TYPE_STRING)) {
1332       const gchar *content_type =
1333           gst_structure_get_string (response_struct, "content-type");
1334       GST_INFO_OBJECT (src, "Setting caps as Content-Type of %s", content_type);
1335       src->caps = gst_caps_make_writable (src->caps);
1336       gst_caps_set_simple (src->caps, "content-type", G_TYPE_STRING,
1337           content_type, NULL);
1338       if (gst_base_src_set_caps (GST_BASE_SRC (src), src->caps) != TRUE) {
1339         GST_ERROR_OBJECT (src, "Setting caps failed!");
1340         return FALSE;
1341       }
1342     }
1343   } else {
1344     GST_DEBUG_OBJECT (src, "No caps have been set, continue.");
1345   }
1346 
1347   return TRUE;
1348 }
1349 
1350 /*
1351  * Cleanup the CURL easy handle once we're done with it.
1352  */
1353 static inline void
gst_curl_http_src_destroy_easy_handle(GstCurlHttpSrc * src)1354 gst_curl_http_src_destroy_easy_handle (GstCurlHttpSrc * src)
1355 {
1356   /* Thank you Handles, and well done. Well done, mate. */
1357   if (src->curl_handle != NULL) {
1358     curl_easy_cleanup (src->curl_handle);
1359     src->curl_handle = NULL;
1360   }
1361   /* In addition, clean up the curl header slist if it was used. */
1362   if (src->slist != NULL) {
1363     curl_slist_free_all (src->slist);
1364     src->slist = NULL;
1365   }
1366 }
1367 
1368 static GstStateChangeReturn
gst_curl_http_src_change_state(GstElement * element,GstStateChange transition)1369 gst_curl_http_src_change_state (GstElement * element, GstStateChange transition)
1370 {
1371   GstStateChangeReturn ret;
1372   GstCurlHttpSrc *source = GST_CURLHTTPSRC (element);
1373   GSTCURL_FUNCTION_ENTRY (source);
1374 
1375   switch (transition) {
1376     case GST_STATE_CHANGE_NULL_TO_READY:
1377       gst_curl_http_src_ref_multi (source);
1378       break;
1379     case GST_STATE_CHANGE_READY_TO_PAUSED:
1380       if (source->uri == NULL) {
1381         GST_ELEMENT_ERROR (element, RESOURCE, OPEN_READ, (_("No URL set.")),
1382             ("Missing URL"));
1383         return GST_STATE_CHANGE_FAILURE;
1384       }
1385       break;
1386     case GST_STATE_CHANGE_READY_TO_NULL:
1387       GST_DEBUG_OBJECT (source, "Removing from multi_loop queue...");
1388       /* The pipeline has ended, so signal any running request to end
1389          and wait until the multi_loop has stopped using this element */
1390       gst_curl_http_src_wait_until_removed (source);
1391       gst_curl_http_src_unref_multi (source);
1392       break;
1393     default:
1394       break;
1395   }
1396 
1397   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1398 
1399   GSTCURL_FUNCTION_EXIT (source);
1400   return ret;
1401 }
1402 
1403 /*
1404  * Take care of any memory that may be left over from the instance that's now
1405  * closing before we leak it.
1406  */
1407 static void
gst_curl_http_src_cleanup_instance(GstCurlHttpSrc * src)1408 gst_curl_http_src_cleanup_instance (GstCurlHttpSrc * src)
1409 {
1410   gint i;
1411   g_mutex_lock (&src->uri_mutex);
1412   g_free (src->uri);
1413   src->uri = NULL;
1414   g_free (src->redirect_uri);
1415   src->redirect_uri = NULL;
1416   g_mutex_unlock (&src->uri_mutex);
1417   g_mutex_clear (&src->uri_mutex);
1418 
1419   g_free (src->proxy_uri);
1420   src->proxy_uri = NULL;
1421   g_free (src->no_proxy_list);
1422   src->no_proxy_list = NULL;
1423   g_free (src->proxy_user);
1424   src->proxy_user = NULL;
1425   g_free (src->proxy_pass);
1426   src->proxy_pass = NULL;
1427 
1428   for (i = 0; i < src->number_cookies; i++) {
1429     g_free (src->cookies[i]);
1430     src->cookies[i] = NULL;
1431   }
1432   g_free (src->cookies);
1433   src->cookies = NULL;
1434 
1435   g_free (src->user_agent);
1436   src->user_agent = NULL;
1437 
1438   g_mutex_clear (&src->buffer_mutex);
1439 
1440   g_cond_clear (&src->buffer_cond);
1441 
1442   g_free (src->buffer);
1443   src->buffer = NULL;
1444 
1445   if (src->request_headers) {
1446     gst_structure_free (src->request_headers);
1447     src->request_headers = NULL;
1448   }
1449   if (src->http_headers != NULL) {
1450     gst_structure_free (src->http_headers);
1451     src->http_headers = NULL;
1452   }
1453   g_free (src->reason_phrase);
1454   src->reason_phrase = NULL;
1455   gst_caps_replace (&src->caps, NULL);
1456 
1457   gst_curl_http_src_destroy_easy_handle (src);
1458 }
1459 
1460 static gboolean
gst_curl_http_src_query(GstBaseSrc * bsrc,GstQuery * query)1461 gst_curl_http_src_query (GstBaseSrc * bsrc, GstQuery * query)
1462 {
1463   GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc);
1464   gboolean ret;
1465   GSTCURL_FUNCTION_ENTRY (src);
1466 
1467   switch (GST_QUERY_TYPE (query)) {
1468     case GST_QUERY_URI:
1469       g_mutex_lock (&src->uri_mutex);
1470       gst_query_set_uri (query, src->uri);
1471       if (src->redirect_uri != NULL) {
1472         gst_query_set_uri_redirection (query, src->redirect_uri);
1473       }
1474       g_mutex_unlock (&src->uri_mutex);
1475       ret = TRUE;
1476       break;
1477     default:
1478       ret = GST_BASE_SRC_CLASS (parent_class)->query (bsrc, query);
1479       break;
1480   }
1481 
1482   GSTCURL_FUNCTION_EXIT (src);
1483   return ret;
1484 }
1485 
1486 static gboolean
gst_curl_http_src_get_content_length(GstBaseSrc * bsrc,guint64 * size)1487 gst_curl_http_src_get_content_length (GstBaseSrc * bsrc, guint64 * size)
1488 {
1489   GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc);
1490   const GValue *response_headers;
1491   gboolean ret = FALSE;
1492 
1493   if (src->http_headers == NULL) {
1494     return FALSE;
1495   }
1496 
1497   response_headers = gst_structure_get_value (src->http_headers,
1498       RESPONSE_HEADERS_NAME);
1499   if (gst_structure_has_field_typed (gst_value_get_structure (response_headers),
1500           "content-length", G_TYPE_STRING)) {
1501     const gchar *content_length =
1502         gst_structure_get_string (gst_value_get_structure (response_headers),
1503         "content-length");
1504     *size = (guint64) g_ascii_strtoull (content_length, NULL, 10);
1505     ret = TRUE;
1506   } else {
1507     GST_DEBUG_OBJECT (src,
1508         "No content length has yet been set, or there was an error!");
1509   }
1510   return ret;
1511 }
1512 
1513 static void
gst_curl_http_src_uri_handler_init(gpointer g_iface,gpointer iface_data)1514 gst_curl_http_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
1515 {
1516   GstURIHandlerInterface *uri_iface = (GstURIHandlerInterface *) g_iface;
1517 
1518   uri_iface->get_type = gst_curl_http_src_urihandler_get_type;
1519   uri_iface->get_protocols = gst_curl_http_src_urihandler_get_protocols;
1520   uri_iface->get_uri = gst_curl_http_src_urihandler_get_uri;
1521   uri_iface->set_uri = gst_curl_http_src_urihandler_set_uri;
1522 }
1523 
1524 static guint
gst_curl_http_src_urihandler_get_type(GType type)1525 gst_curl_http_src_urihandler_get_type (GType type)
1526 {
1527   return GST_URI_SRC;
1528 }
1529 
1530 static const gchar *const *
gst_curl_http_src_urihandler_get_protocols(GType type)1531 gst_curl_http_src_urihandler_get_protocols (GType type)
1532 {
1533   static const gchar *protocols[] = { "http", "https", NULL };
1534 
1535   return protocols;
1536 }
1537 
1538 static gchar *
gst_curl_http_src_urihandler_get_uri(GstURIHandler * handler)1539 gst_curl_http_src_urihandler_get_uri (GstURIHandler * handler)
1540 {
1541   gchar *ret;
1542   GstCurlHttpSrc *source;
1543 
1544   g_return_val_if_fail (GST_IS_URI_HANDLER (handler), NULL);
1545   source = GST_CURLHTTPSRC (handler);
1546 
1547   GSTCURL_FUNCTION_ENTRY (source);
1548 
1549   g_mutex_lock (&source->uri_mutex);
1550   ret = g_strdup (source->uri);
1551   g_mutex_unlock (&source->uri_mutex);
1552 
1553   GSTCURL_FUNCTION_EXIT (source);
1554   return ret;
1555 }
1556 
1557 static gboolean
gst_curl_http_src_urihandler_set_uri(GstURIHandler * handler,const gchar * uri,GError ** error)1558 gst_curl_http_src_urihandler_set_uri (GstURIHandler * handler,
1559     const gchar * uri, GError ** error)
1560 {
1561   GstCurlHttpSrc *source = GST_CURLHTTPSRC (handler);
1562   GSTCURL_FUNCTION_ENTRY (source);
1563 
1564   g_return_val_if_fail (GST_IS_URI_HANDLER (handler), FALSE);
1565   g_return_val_if_fail (uri != NULL, FALSE);
1566 
1567   g_mutex_lock (&source->uri_mutex);
1568 
1569   if (source->uri != NULL) {
1570     GST_DEBUG_OBJECT (source,
1571         "URI already present as %s, updating to new URI %s", source->uri, uri);
1572     g_free (source->uri);
1573   }
1574 
1575   source->uri = g_strdup (uri);
1576   if (source->uri == NULL) {
1577     g_mutex_unlock (&source->uri_mutex);
1578     return FALSE;
1579   }
1580   source->retries_remaining = source->total_retries;
1581 
1582   g_mutex_unlock (&source->uri_mutex);
1583 
1584   GSTCURL_FUNCTION_EXIT (source);
1585   return TRUE;
1586 }
1587 
1588 /*
1589  * Cancel any currently running transfer, and then signal all the loops to drop
1590  * any received buffers. The ::create() method should return GST_FLOW_FLUSHING.
1591  */
1592 static gboolean
gst_curl_http_src_unlock(GstBaseSrc * bsrc)1593 gst_curl_http_src_unlock (GstBaseSrc * bsrc)
1594 {
1595   GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc);
1596   gboolean want_removal = FALSE;
1597 
1598   g_mutex_lock (&src->buffer_mutex);
1599   if (src->state != GSTCURL_UNLOCK) {
1600     if (src->state == GSTCURL_OK) {
1601       /* A transfer is running, cancel it */
1602       if (src->connection_status == GSTCURL_CONNECTED) {
1603         src->connection_status = GSTCURL_WANT_REMOVAL;
1604       }
1605       want_removal = TRUE;
1606     }
1607     src->pending_state = src->state;
1608     src->state = GSTCURL_UNLOCK;
1609   }
1610   g_cond_signal (&src->buffer_cond);
1611   g_mutex_unlock (&src->buffer_mutex);
1612 
1613   if (want_removal) {
1614     GstCurlHttpSrcClass *klass = G_TYPE_INSTANCE_GET_CLASS (src,
1615         GST_TYPE_CURL_HTTP_SRC,
1616         GstCurlHttpSrcClass);
1617     g_mutex_lock (&klass->multi_task_context.mutex);
1618     g_cond_signal (&klass->multi_task_context.signal);
1619     g_mutex_unlock (&klass->multi_task_context.mutex);
1620   }
1621 
1622   return TRUE;
1623 }
1624 
1625 /*
1626  * Finish the unlock request above and return curlhttpsrc to the normal state.
1627  * This will probably be GSTCURL_DONE, and the next return from ::create() will
1628  * be GST_FLOW_EOS as we don't want to deliver parts of a HTTP body.
1629  */
1630 static gboolean
gst_curl_http_src_unlock_stop(GstBaseSrc * bsrc)1631 gst_curl_http_src_unlock_stop (GstBaseSrc * bsrc)
1632 {
1633   GstCurlHttpSrc *src = GST_CURLHTTPSRC (bsrc);
1634 
1635   g_mutex_lock (&src->buffer_mutex);
1636   src->state = src->pending_state;
1637   src->pending_state = GSTCURL_NONE;
1638   g_cond_signal (&src->buffer_cond);
1639   g_mutex_unlock (&src->buffer_mutex);
1640 
1641   return TRUE;
1642 }
1643 
1644 /*****************************************************************************
1645  * Curl loop task functions begin
1646  *****************************************************************************/
1647 static void
gst_curl_http_src_curl_multi_loop(gpointer thread_data)1648 gst_curl_http_src_curl_multi_loop (gpointer thread_data)
1649 {
1650   GstCurlHttpSrcMultiTaskContext *context;
1651   GstCurlHttpSrcQueueElement *qelement, *qnext;
1652   gint i, still_running = 0;
1653   CURLMsg *curl_message;
1654   GstCurlHttpSrc *elt;
1655   guint active = 0;
1656 
1657   context = (GstCurlHttpSrcMultiTaskContext *) thread_data;
1658 
1659   g_mutex_lock (&context->mutex);
1660 
1661   /* Someone is holding a reference to us, but isn't using us so to avoid
1662    * unnecessary clock cycle wasting, sit in a conditional wait until woken.
1663    */
1664   while (context->queue == NULL
1665       && context->state == GSTCURL_MULTI_LOOP_STATE_RUNNING) {
1666     GSTCURL_DEBUG_PRINT ("Waiting for an element to be added...");
1667     g_cond_wait (&context->signal, &context->mutex);
1668     GSTCURL_DEBUG_PRINT ("Received wake up call!");
1669   }
1670   if (context->state == GSTCURL_MULTI_LOOP_STATE_STOP) {
1671     GSTCURL_INFO_PRINT ("Got instruction to shut down");
1672     goto out;
1673   }
1674 
1675   /* check for elements that need to be started or removed */
1676   qelement = context->queue;
1677   while (qelement != NULL) {
1678     qnext = qelement->next;
1679     elt = qelement->p;
1680     /* NOTE: when both the buffer_mutex and multi_task_context.mutex are
1681        needed, multi_task_context.mutex must be acquired first */
1682     g_mutex_lock (&elt->buffer_mutex);
1683     if (elt->connection_status == GSTCURL_WANT_REMOVAL) {
1684       curl_multi_remove_handle (context->multi_handle, elt->curl_handle);
1685       if (elt->state == GSTCURL_UNLOCK) {
1686         elt->pending_state = GSTCURL_REMOVED;
1687       } else {
1688         elt->state = GSTCURL_REMOVED;
1689       }
1690       elt->connection_status = GSTCURL_NOT_CONNECTED;
1691       gst_curl_http_src_remove_queue_item (&context->queue, qelement->p);
1692       g_cond_signal (&elt->buffer_cond);
1693     } else if (elt->connection_status == GSTCURL_CONNECTED) {
1694       active++;
1695       if (g_atomic_int_compare_and_exchange (&qelement->running, 0, 1)) {
1696         GSTCURL_DEBUG_PRINT ("Adding easy handle for URI %s", qelement->p->uri);
1697         curl_multi_add_handle (context->multi_handle, qelement->p->curl_handle);
1698       }
1699     }
1700     g_mutex_unlock (&elt->buffer_mutex);
1701     qelement = qnext;
1702   }
1703 
1704   if (active == 0) {
1705     GSTCURL_DEBUG_PRINT ("No active elements");
1706     goto out;
1707   }
1708 
1709   /* perform a select() on all of the active sockets and process any
1710      messages from curl */
1711   {
1712     struct timeval timeout;
1713     gint rc;
1714     fd_set fdread, fdwrite, fdexcep;
1715     int maxfd = -1;
1716     long curl_timeo = -1;
1717     gboolean cond = FALSE;
1718 
1719     /* Because curl can possibly take some time here, be nice and let go of the
1720      * mutex so other threads can perform state/queue operations as we don't
1721      * care about those until the end of this. */
1722     g_mutex_unlock (&context->mutex);
1723 
1724     FD_ZERO (&fdread);
1725     FD_ZERO (&fdwrite);
1726     FD_ZERO (&fdexcep);
1727 
1728     timeout.tv_sec = 1;
1729     timeout.tv_usec = 0;
1730 
1731     curl_multi_timeout (context->multi_handle, &curl_timeo);
1732     if (curl_timeo >= 0) {
1733       timeout.tv_sec = curl_timeo / 1000;
1734       if (timeout.tv_sec > 1) {
1735         timeout.tv_sec = 1;
1736       } else {
1737         timeout.tv_usec = (curl_timeo % 1000) * 1000;
1738       }
1739     }
1740 
1741     /* get file descriptors from the transfers */
1742     curl_multi_fdset (context->multi_handle, &fdread, &fdwrite, &fdexcep,
1743         &maxfd);
1744 
1745     rc = select (maxfd + 1, &fdread, &fdwrite, &fdexcep, &timeout);
1746 
1747     switch (rc) {
1748       case -1:
1749         /* select error */
1750         break;
1751       case 0:
1752       default:
1753         /* timeout or readable/writable sockets */
1754         curl_multi_perform (context->multi_handle, &still_running);
1755         break;
1756     }
1757 
1758     g_mutex_lock (&context->mutex);
1759 
1760     /*
1761      * Check the CURL message buffer to find out if any transfers have
1762      * completed. If they have, call the signal_finished function which
1763      * will signal the g_cond_wait call in that calling instance.
1764      */
1765     i = 0;
1766     while (cond != TRUE) {
1767       curl_message = curl_multi_info_read (context->multi_handle, &i);
1768       if (curl_message == NULL) {
1769         cond = TRUE;
1770       } else if (curl_message->msg == CURLMSG_DONE) {
1771         /* A hack, but I have seen curl_message->easy_handle being
1772          * NULL randomly, so check for that. */
1773         if (curl_message->easy_handle != NULL) {
1774           curl_multi_remove_handle (context->multi_handle,
1775               curl_message->easy_handle);
1776           gst_curl_http_src_remove_queue_handle (&context->queue,
1777               curl_message->easy_handle, curl_message->data.result);
1778         }
1779       }
1780     }
1781   }
1782 out:
1783   g_mutex_unlock (&context->mutex);
1784 }
1785 
1786 /*
1787  * Receive headers from the remote server and put them into the http_headers
1788  * structure to be sent downstream when we've got them all and started receiving
1789  * the body (see ::_handle_response())
1790  */
1791 static size_t
gst_curl_http_src_get_header(void * header,size_t size,size_t nmemb,void * src)1792 gst_curl_http_src_get_header (void *header, size_t size, size_t nmemb,
1793     void *src)
1794 {
1795   GstCurlHttpSrc *s = src;
1796   char *substr;
1797 
1798   GST_DEBUG_OBJECT (s, "Received header: %s", (char *) header);
1799 
1800   g_mutex_lock (&s->buffer_mutex);
1801 
1802   if (s->state == GSTCURL_UNLOCK) {
1803     g_mutex_unlock (&s->buffer_mutex);
1804     return size * nmemb;
1805   }
1806 
1807   if (s->http_headers == NULL) {
1808     /* Can't do anything here, so just silently swallow the header */
1809     GST_DEBUG_OBJECT (s, "HTTP Headers Structure has already been sent,"
1810         " ignoring header");
1811     g_mutex_unlock (&s->buffer_mutex);
1812     return size * nmemb;
1813   }
1814 
1815   substr = gst_curl_http_src_strcasestr (header, "HTTP");
1816   if (substr == header) {
1817     /* We have a status line! */
1818     gchar **status_line_fields;
1819 
1820     /* Have we already seen a status line? If so, delete any response headers */
1821     if (s->status_code > 0) {
1822       GstStructure *empty_headers =
1823           gst_structure_new_empty (RESPONSE_HEADERS_NAME);
1824       gst_structure_remove_field (s->http_headers, RESPONSE_HEADERS_NAME);
1825       gst_structure_set (s->http_headers, RESPONSE_HEADERS_NAME,
1826           GST_TYPE_STRUCTURE, empty_headers, NULL);
1827       gst_structure_free (empty_headers);
1828 
1829     }
1830 
1831     /* Process the status line */
1832     status_line_fields = g_strsplit ((gchar *) header, " ", 3);
1833     if (status_line_fields == NULL) {
1834       GST_ERROR_OBJECT (s, "Status line processing failed!");
1835     } else {
1836       s->status_code =
1837           (guint) g_ascii_strtoll (status_line_fields[1], NULL, 10);
1838       g_free (s->reason_phrase);
1839       s->reason_phrase = g_strdup (status_line_fields[2]);
1840       GST_INFO_OBJECT (s, "Received status %u for request for URI %s: %s",
1841           s->status_code, s->uri, s->reason_phrase);
1842       gst_structure_set (s->http_headers, HTTP_STATUS_CODE,
1843           G_TYPE_UINT, s->status_code, NULL);
1844       g_strfreev (status_line_fields);
1845     }
1846   } else {
1847     /* Normal header line */
1848     gchar **header_tpl = g_strsplit ((gchar *) header, ": ", 2);
1849     if (header_tpl == NULL) {
1850       GST_ERROR_OBJECT (s, "Header processing failed! (%s)", (gchar *) header);
1851     } else {
1852       const GValue *gv_resp_hdrs = gst_structure_get_value (s->http_headers,
1853           RESPONSE_HEADERS_NAME);
1854       const GstStructure *response_headers =
1855           gst_value_get_structure (gv_resp_hdrs);
1856       /* Store header key lower case (g_ascii_strdown), makes searching through
1857        * later on easier - end applications shouldn't care, as all HTTP headers
1858        * are case-insensitive */
1859       gchar *header_key = g_ascii_strdown (header_tpl[0], -1);
1860       gchar *header_value;
1861 
1862       /* If header field already exists, append to the end */
1863       if (gst_structure_has_field (response_headers, header_key) == TRUE) {
1864         header_value = g_strdup_printf ("%s, %s",
1865             gst_structure_get_string (response_headers, header_key),
1866             header_tpl[1]);
1867         gst_structure_set ((GstStructure *) response_headers, header_key,
1868             G_TYPE_STRING, header_value, NULL);
1869         g_free (header_value);
1870       } else {
1871         header_value = header_tpl[1];
1872         gst_structure_set ((GstStructure *) response_headers, header_key,
1873             G_TYPE_STRING, header_value, NULL);
1874       }
1875 
1876       /* We have some special cases - deal with them here */
1877       if (g_strcmp0 (header_key, "content-type") == 0) {
1878         gst_curl_http_src_negotiate_caps (src);
1879       }
1880 
1881       g_free (header_key);
1882       g_strfreev (header_tpl);
1883     }
1884   }
1885 
1886   s->hdrs_updated = TRUE;
1887 
1888   g_mutex_unlock (&s->buffer_mutex);
1889 
1890   return size * nmemb;
1891 }
1892 
1893 /*
1894  * My own quick and dirty implementation of strcasestr. This is a GNU extension
1895  * (i.e. not portable) and not always guaranteed to be available.
1896  *
1897  * I know this doesn't work if the haystack and needle are the same size. But
1898  * this isn't necessarily a bad thing, as the only place we currently use this
1899  * is at a point where returning nothing even if a string match occurs but the
1900  * needle is the same size as the haystack actually saves us time.
1901  */
1902 static char *
gst_curl_http_src_strcasestr(const char * haystack,const char * needle)1903 gst_curl_http_src_strcasestr (const char *haystack, const char *needle)
1904 {
1905   int i, j, needle_len;
1906   char *location;
1907 
1908   needle_len = (int) strlen (needle);
1909   i = 0;
1910   j = 0;
1911   location = NULL;
1912 
1913   while (haystack[i] != '\0') {
1914     if (j == needle_len) {
1915       location = (char *) haystack + (i - j);
1916     }
1917     if (tolower (haystack[i]) == tolower (needle[j])) {
1918       j++;
1919     } else {
1920       j = 0;
1921     }
1922     i++;
1923   }
1924 
1925   return location;
1926 }
1927 
1928 /*
1929  * Receive chunks of the requested body and pass these back to the ::create()
1930  * loop
1931  */
1932 static size_t
gst_curl_http_src_get_chunks(void * chunk,size_t size,size_t nmemb,void * src)1933 gst_curl_http_src_get_chunks (void *chunk, size_t size, size_t nmemb, void *src)
1934 {
1935   GstCurlHttpSrc *s = src;
1936   size_t chunk_len = size * nmemb;
1937   GST_TRACE_OBJECT (s,
1938       "Received curl chunk for URI %s of size %d", s->uri, (int) chunk_len);
1939   g_mutex_lock (&s->buffer_mutex);
1940   if (s->state == GSTCURL_UNLOCK) {
1941     g_mutex_unlock (&s->buffer_mutex);
1942     return chunk_len;
1943   }
1944   s->buffer =
1945       g_realloc (s->buffer, (s->buffer_len + chunk_len + 1) * sizeof (char));
1946   if (s->buffer == NULL) {
1947     GST_ERROR_OBJECT (s, "Realloc for cURL response message failed!\n");
1948     return 0;
1949   }
1950   memcpy (s->buffer + s->buffer_len, chunk, chunk_len);
1951   s->buffer_len += chunk_len;
1952   g_cond_signal (&s->buffer_cond);
1953   g_mutex_unlock (&s->buffer_mutex);
1954   return chunk_len;
1955 }
1956 
1957 /*
1958  * Request a cancellation of a currently running curl handle.
1959  */
1960 static void
gst_curl_http_src_request_remove(GstCurlHttpSrc * src)1961 gst_curl_http_src_request_remove (GstCurlHttpSrc * src)
1962 {
1963   GstCurlHttpSrcClass *klass = G_TYPE_INSTANCE_GET_CLASS (src,
1964       GST_TYPE_CURL_HTTP_SRC,
1965       GstCurlHttpSrcClass);
1966 
1967   g_mutex_lock (&klass->multi_task_context.mutex);
1968   g_mutex_lock (&src->buffer_mutex);
1969   if (src->connection_status == GSTCURL_CONNECTED) {
1970     src->connection_status = GSTCURL_WANT_REMOVAL;
1971   }
1972   g_mutex_unlock (&src->buffer_mutex);
1973   g_cond_signal (&klass->multi_task_context.signal);
1974   g_mutex_unlock (&klass->multi_task_context.mutex);
1975 }
1976 
1977 /*
1978  * Request a cancellation of a currently running curl handle and
1979  * block this thread until the src element has been removed
1980  * from the queue
1981  */
1982 static void
gst_curl_http_src_wait_until_removed(GstCurlHttpSrc * src)1983 gst_curl_http_src_wait_until_removed (GstCurlHttpSrc * src)
1984 {
1985   gst_curl_http_src_request_remove (src);
1986   g_mutex_lock (&src->buffer_mutex);
1987   while (src->connection_status != GSTCURL_NOT_CONNECTED) {
1988     g_cond_wait (&src->buffer_cond, &src->buffer_mutex);
1989   }
1990   g_mutex_unlock (&src->buffer_mutex);
1991 }
1992