1 /* GStreamer
2  * Copyright (C) 2011 David Schleef <ds@entropywave.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
17  * Boston, MA 02110-1335, USA.
18  */
19 /**
20  * SECTION:element-gstsouphttpclientsink
21  *
22  * The souphttpclientsink element sends pipeline data to an HTTP server
23  * using HTTP PUT commands.
24  *
25  * <refsect2>
26  * <title>Example launch line</title>
27  * |[
28  * gst-launch-1.0 -v videotestsrc num-buffers=300 ! theoraenc ! oggmux !
29  *   souphttpclientsink location=http://server/filename.ogv
30  * ]|
31  *
32  * This example encodes 10 seconds of video and sends it to the HTTP
33  * server "server" using HTTP PUT commands.
34  * </refsect2>
35  */
36 
37 #ifdef HAVE_CONFIG_H
38 #include "config.h"
39 #endif
40 
41 #include <gst/gst.h>
42 #include <gst/base/gstbasesink.h>
43 #include "gstsouphttpclientsink.h"
44 #include "gstsouputils.h"
45 
46 GST_DEBUG_CATEGORY_STATIC (souphttpclientsink_dbg);
47 #define GST_CAT_DEFAULT souphttpclientsink_dbg
48 
49 /* prototypes */
50 
51 
52 static void gst_soup_http_client_sink_set_property (GObject * object,
53     guint property_id, const GValue * value, GParamSpec * pspec);
54 static void gst_soup_http_client_sink_get_property (GObject * object,
55     guint property_id, GValue * value, GParamSpec * pspec);
56 static void gst_soup_http_client_sink_dispose (GObject * object);
57 static void gst_soup_http_client_sink_finalize (GObject * object);
58 
59 static gboolean gst_soup_http_client_sink_set_caps (GstBaseSink * sink,
60     GstCaps * caps);
61 static void gst_soup_http_client_sink_get_times (GstBaseSink * sink,
62     GstBuffer * buffer, GstClockTime * start, GstClockTime * end);
63 static gboolean gst_soup_http_client_sink_start (GstBaseSink * sink);
64 static gboolean gst_soup_http_client_sink_stop (GstBaseSink * sink);
65 static gboolean gst_soup_http_client_sink_unlock (GstBaseSink * sink);
66 static gboolean gst_soup_http_client_sink_event (GstBaseSink * sink,
67     GstEvent * event);
68 static GstFlowReturn gst_soup_http_client_sink_preroll (GstBaseSink * sink,
69     GstBuffer * buffer);
70 static GstFlowReturn gst_soup_http_client_sink_render (GstBaseSink * sink,
71     GstBuffer * buffer);
72 
73 static void gst_soup_http_client_sink_reset (GstSoupHttpClientSink *
74     souphttpsink);
75 static void authenticate (SoupSession * session, SoupMessage * msg,
76     SoupAuth * auth, gboolean retrying, gpointer user_data);
77 static void callback (SoupSession * session, SoupMessage * msg,
78     gpointer user_data);
79 static gboolean gst_soup_http_client_sink_set_proxy (GstSoupHttpClientSink *
80     souphttpsink, const gchar * uri);
81 
82 enum
83 {
84   PROP_0,
85   PROP_LOCATION,
86   PROP_USER_AGENT,
87   PROP_AUTOMATIC_REDIRECT,
88   PROP_PROXY,
89   PROP_USER_ID,
90   PROP_USER_PW,
91   PROP_PROXY_ID,
92   PROP_PROXY_PW,
93   PROP_COOKIES,
94   PROP_SESSION,
95   PROP_SOUP_LOG_LEVEL,
96   PROP_RETRY_DELAY,
97   PROP_RETRIES
98 };
99 
100 #define DEFAULT_USER_AGENT           "GStreamer souphttpclientsink "
101 #define DEFAULT_SOUP_LOG_LEVEL       SOUP_LOGGER_LOG_NONE
102 
103 /* pad templates */
104 
105 static GstStaticPadTemplate gst_soup_http_client_sink_sink_template =
106 GST_STATIC_PAD_TEMPLATE ("sink",
107     GST_PAD_SINK,
108     GST_PAD_ALWAYS,
109     GST_STATIC_CAPS_ANY);
110 
111 
112 /* class initialization */
113 
114 #define gst_soup_http_client_sink_parent_class parent_class
115 G_DEFINE_TYPE (GstSoupHttpClientSink, gst_soup_http_client_sink,
116     GST_TYPE_BASE_SINK);
117 
118 static void
gst_soup_http_client_sink_class_init(GstSoupHttpClientSinkClass * klass)119 gst_soup_http_client_sink_class_init (GstSoupHttpClientSinkClass * klass)
120 {
121   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
122   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
123   GstBaseSinkClass *base_sink_class = GST_BASE_SINK_CLASS (klass);
124 
125   gobject_class->set_property = gst_soup_http_client_sink_set_property;
126   gobject_class->get_property = gst_soup_http_client_sink_get_property;
127   gobject_class->dispose = gst_soup_http_client_sink_dispose;
128   gobject_class->finalize = gst_soup_http_client_sink_finalize;
129 
130   g_object_class_install_property (gobject_class,
131       PROP_LOCATION,
132       g_param_spec_string ("location", "Location",
133           "URI to send to", "", G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
134   g_object_class_install_property (gobject_class,
135       PROP_USER_AGENT,
136       g_param_spec_string ("user-agent", "User-Agent",
137           "Value of the User-Agent HTTP request header field",
138           DEFAULT_USER_AGENT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
139   g_object_class_install_property (gobject_class,
140       PROP_AUTOMATIC_REDIRECT,
141       g_param_spec_boolean ("automatic-redirect", "automatic-redirect",
142           "Automatically follow HTTP redirects (HTTP Status Code 3xx)",
143           TRUE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
144   g_object_class_install_property (gobject_class,
145       PROP_PROXY,
146       g_param_spec_string ("proxy", "Proxy",
147           "HTTP proxy server URI", "",
148           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
149   g_object_class_install_property (gobject_class,
150       PROP_USER_ID,
151       g_param_spec_string ("user-id", "user-id",
152           "user id for authentication", "",
153           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
154   g_object_class_install_property (gobject_class, PROP_USER_PW,
155       g_param_spec_string ("user-pw", "user-pw",
156           "user password for authentication", "",
157           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
158   g_object_class_install_property (gobject_class, PROP_PROXY_ID,
159       g_param_spec_string ("proxy-id", "proxy-id",
160           "user id for proxy authentication", "",
161           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
162   g_object_class_install_property (gobject_class, PROP_PROXY_PW,
163       g_param_spec_string ("proxy-pw", "proxy-pw",
164           "user password for proxy authentication", "",
165           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
166   g_object_class_install_property (gobject_class, PROP_SESSION,
167       g_param_spec_object ("session", "session",
168           "SoupSession object to use for communication",
169           SOUP_TYPE_SESSION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
170   g_object_class_install_property (gobject_class, PROP_COOKIES,
171       g_param_spec_boxed ("cookies", "Cookies", "HTTP request cookies",
172           G_TYPE_STRV, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
173   g_object_class_install_property (gobject_class, PROP_RETRY_DELAY,
174       g_param_spec_int ("retry-delay", "Retry Delay",
175           "Delay in seconds between retries after a failure", 1, G_MAXINT, 5,
176           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
177   g_object_class_install_property (gobject_class, PROP_RETRIES,
178       g_param_spec_int ("retries", "Retries",
179           "Maximum number of retries, zero to disable, -1 to retry forever",
180           -1, G_MAXINT, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
181  /**
182    * GstSoupHttpClientSink::http-log-level:
183    *
184    * If set and > 0, captures and dumps HTTP session data as
185    * log messages if log level >= GST_LEVEL_TRACE
186    *
187    * Since: 1.4
188    */
189   g_object_class_install_property (gobject_class, PROP_SOUP_LOG_LEVEL,
190       g_param_spec_enum ("http-log-level", "HTTP log level",
191           "Set log level for soup's HTTP session log",
192           SOUP_TYPE_LOGGER_LOG_LEVEL, DEFAULT_SOUP_LOG_LEVEL,
193           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
194 
195   gst_element_class_add_static_pad_template (gstelement_class,
196       &gst_soup_http_client_sink_sink_template);
197 
198   gst_element_class_set_static_metadata (gstelement_class, "HTTP client sink",
199       "Generic", "Sends streams to HTTP server via PUT",
200       "David Schleef <ds@entropywave.com>");
201 
202   base_sink_class->set_caps =
203       GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_set_caps);
204   if (0)
205     base_sink_class->get_times =
206         GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_get_times);
207   base_sink_class->start = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_start);
208   base_sink_class->stop = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_stop);
209   base_sink_class->unlock =
210       GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_unlock);
211   base_sink_class->event = GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_event);
212   if (0)
213     base_sink_class->preroll =
214         GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_preroll);
215   base_sink_class->render =
216       GST_DEBUG_FUNCPTR (gst_soup_http_client_sink_render);
217 
218   GST_DEBUG_CATEGORY_INIT (souphttpclientsink_dbg, "souphttpclientsink", 0,
219       "souphttpclientsink element");
220 
221 }
222 
223 static void
gst_soup_http_client_sink_init(GstSoupHttpClientSink * souphttpsink)224 gst_soup_http_client_sink_init (GstSoupHttpClientSink * souphttpsink)
225 {
226   const char *proxy;
227 
228   g_mutex_init (&souphttpsink->mutex);
229   g_cond_init (&souphttpsink->cond);
230 
231   souphttpsink->location = NULL;
232   souphttpsink->automatic_redirect = TRUE;
233   souphttpsink->user_agent = g_strdup (DEFAULT_USER_AGENT);
234   souphttpsink->user_id = NULL;
235   souphttpsink->user_pw = NULL;
236   souphttpsink->proxy_id = NULL;
237   souphttpsink->proxy_pw = NULL;
238   souphttpsink->prop_session = NULL;
239   souphttpsink->timeout = 1;
240   souphttpsink->log_level = DEFAULT_SOUP_LOG_LEVEL;
241   souphttpsink->retry_delay = 5;
242   souphttpsink->retries = 0;
243   proxy = g_getenv ("http_proxy");
244   if (proxy && !gst_soup_http_client_sink_set_proxy (souphttpsink, proxy)) {
245     GST_WARNING_OBJECT (souphttpsink,
246         "The proxy in the http_proxy env var (\"%s\") cannot be parsed.",
247         proxy);
248   }
249 
250   gst_soup_http_client_sink_reset (souphttpsink);
251 }
252 
253 static void
gst_soup_http_client_sink_reset(GstSoupHttpClientSink * souphttpsink)254 gst_soup_http_client_sink_reset (GstSoupHttpClientSink * souphttpsink)
255 {
256   g_list_free_full (souphttpsink->queued_buffers,
257       (GDestroyNotify) gst_buffer_unref);
258   souphttpsink->queued_buffers = NULL;
259   g_free (souphttpsink->reason_phrase);
260   souphttpsink->reason_phrase = NULL;
261   souphttpsink->status_code = 0;
262   souphttpsink->offset = 0;
263   souphttpsink->failures = 0;
264 
265   g_list_free_full (souphttpsink->streamheader_buffers,
266       (GDestroyNotify) gst_buffer_unref);
267   souphttpsink->streamheader_buffers = NULL;
268   g_list_free_full (souphttpsink->sent_buffers,
269       (GDestroyNotify) gst_buffer_unref);
270   souphttpsink->sent_buffers = NULL;
271 }
272 
273 static gboolean
gst_soup_http_client_sink_set_proxy(GstSoupHttpClientSink * souphttpsink,const gchar * uri)274 gst_soup_http_client_sink_set_proxy (GstSoupHttpClientSink * souphttpsink,
275     const gchar * uri)
276 {
277   if (souphttpsink->proxy) {
278     soup_uri_free (souphttpsink->proxy);
279     souphttpsink->proxy = NULL;
280   }
281   if (g_str_has_prefix (uri, "http://")) {
282     souphttpsink->proxy = soup_uri_new (uri);
283   } else {
284     gchar *new_uri = g_strconcat ("http://", uri, NULL);
285 
286     souphttpsink->proxy = soup_uri_new (new_uri);
287     g_free (new_uri);
288   }
289 
290   return TRUE;
291 }
292 
293 void
gst_soup_http_client_sink_set_property(GObject * object,guint property_id,const GValue * value,GParamSpec * pspec)294 gst_soup_http_client_sink_set_property (GObject * object, guint property_id,
295     const GValue * value, GParamSpec * pspec)
296 {
297   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
298 
299   g_mutex_lock (&souphttpsink->mutex);
300   switch (property_id) {
301     case PROP_SESSION:
302       if (souphttpsink->prop_session) {
303         g_object_unref (souphttpsink->prop_session);
304       }
305       souphttpsink->prop_session = g_value_dup_object (value);
306       break;
307     case PROP_LOCATION:
308       g_free (souphttpsink->location);
309       souphttpsink->location = g_value_dup_string (value);
310       souphttpsink->offset = 0;
311       if ((souphttpsink->location == NULL)
312           || !gst_uri_is_valid (souphttpsink->location)) {
313         GST_WARNING_OBJECT (souphttpsink,
314             "The location (\"%s\") set, is not a valid uri.",
315             souphttpsink->location);
316         g_free (souphttpsink->location);
317         souphttpsink->location = NULL;
318       }
319       break;
320     case PROP_USER_AGENT:
321       g_free (souphttpsink->user_agent);
322       souphttpsink->user_agent = g_value_dup_string (value);
323       break;
324     case PROP_AUTOMATIC_REDIRECT:
325       souphttpsink->automatic_redirect = g_value_get_boolean (value);
326       break;
327     case PROP_USER_ID:
328       g_free (souphttpsink->user_id);
329       souphttpsink->user_id = g_value_dup_string (value);
330       break;
331     case PROP_USER_PW:
332       g_free (souphttpsink->user_pw);
333       souphttpsink->user_pw = g_value_dup_string (value);
334       break;
335     case PROP_PROXY_ID:
336       g_free (souphttpsink->proxy_id);
337       souphttpsink->proxy_id = g_value_dup_string (value);
338       break;
339     case PROP_PROXY_PW:
340       g_free (souphttpsink->proxy_pw);
341       souphttpsink->proxy_pw = g_value_dup_string (value);
342       break;
343     case PROP_PROXY:
344     {
345       const gchar *proxy;
346 
347       proxy = g_value_get_string (value);
348 
349       if (proxy == NULL) {
350         GST_WARNING ("proxy property cannot be NULL");
351         goto done;
352       }
353       if (!gst_soup_http_client_sink_set_proxy (souphttpsink, proxy)) {
354         GST_WARNING ("badly formatted proxy URI");
355         goto done;
356       }
357       break;
358     }
359     case PROP_COOKIES:
360       g_strfreev (souphttpsink->cookies);
361       souphttpsink->cookies = g_strdupv (g_value_get_boxed (value));
362       break;
363     case PROP_SOUP_LOG_LEVEL:
364       souphttpsink->log_level = g_value_get_enum (value);
365       break;
366     case PROP_RETRY_DELAY:
367       souphttpsink->retry_delay = g_value_get_int (value);
368       break;
369     case PROP_RETRIES:
370       souphttpsink->retries = g_value_get_int (value);
371       break;
372     default:
373       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
374       break;
375   }
376 done:
377   g_mutex_unlock (&souphttpsink->mutex);
378 }
379 
380 void
gst_soup_http_client_sink_get_property(GObject * object,guint property_id,GValue * value,GParamSpec * pspec)381 gst_soup_http_client_sink_get_property (GObject * object, guint property_id,
382     GValue * value, GParamSpec * pspec)
383 {
384   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
385 
386   switch (property_id) {
387     case PROP_SESSION:
388       g_value_set_object (value, souphttpsink->prop_session);
389       break;
390     case PROP_LOCATION:
391       g_value_set_string (value, souphttpsink->location);
392       break;
393     case PROP_AUTOMATIC_REDIRECT:
394       g_value_set_boolean (value, souphttpsink->automatic_redirect);
395       break;
396     case PROP_USER_AGENT:
397       g_value_set_string (value, souphttpsink->user_agent);
398       break;
399     case PROP_USER_ID:
400       g_value_set_string (value, souphttpsink->user_id);
401       break;
402     case PROP_USER_PW:
403       g_value_set_string (value, souphttpsink->user_pw);
404       break;
405     case PROP_PROXY_ID:
406       g_value_set_string (value, souphttpsink->proxy_id);
407       break;
408     case PROP_PROXY_PW:
409       g_value_set_string (value, souphttpsink->proxy_pw);
410       break;
411     case PROP_PROXY:
412       if (souphttpsink->proxy == NULL)
413         g_value_set_static_string (value, "");
414       else {
415         char *proxy = soup_uri_to_string (souphttpsink->proxy, FALSE);
416 
417         g_value_set_string (value, proxy);
418         g_free (proxy);
419       }
420       break;
421     case PROP_COOKIES:
422       g_value_set_boxed (value, g_strdupv (souphttpsink->cookies));
423       break;
424     case PROP_SOUP_LOG_LEVEL:
425       g_value_set_enum (value, souphttpsink->log_level);
426       break;
427     case PROP_RETRY_DELAY:
428       g_value_set_int (value, souphttpsink->retry_delay);
429       break;
430     case PROP_RETRIES:
431       g_value_set_int (value, souphttpsink->retries);
432       break;
433     default:
434       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
435       break;
436   }
437 }
438 
439 void
gst_soup_http_client_sink_dispose(GObject * object)440 gst_soup_http_client_sink_dispose (GObject * object)
441 {
442   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
443 
444   /* clean up as possible.  may be called multiple times */
445   if (souphttpsink->prop_session)
446     g_object_unref (souphttpsink->prop_session);
447   souphttpsink->prop_session = NULL;
448 
449   G_OBJECT_CLASS (parent_class)->dispose (object);
450 }
451 
452 void
gst_soup_http_client_sink_finalize(GObject * object)453 gst_soup_http_client_sink_finalize (GObject * object)
454 {
455   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (object);
456 
457   /* clean up object here */
458 
459   g_free (souphttpsink->user_agent);
460   g_free (souphttpsink->user_id);
461   g_free (souphttpsink->user_pw);
462   g_free (souphttpsink->proxy_id);
463   g_free (souphttpsink->proxy_pw);
464   if (souphttpsink->proxy)
465     soup_uri_free (souphttpsink->proxy);
466   g_free (souphttpsink->location);
467   g_strfreev (souphttpsink->cookies);
468 
469   g_cond_clear (&souphttpsink->cond);
470   g_mutex_clear (&souphttpsink->mutex);
471 
472   G_OBJECT_CLASS (parent_class)->finalize (object);
473 }
474 
475 
476 
477 static gboolean
gst_soup_http_client_sink_set_caps(GstBaseSink * sink,GstCaps * caps)478 gst_soup_http_client_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
479 {
480   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
481   GstStructure *structure;
482   const GValue *value_array;
483   int i, n;
484 
485   GST_DEBUG_OBJECT (souphttpsink, "new stream headers set");
486   structure = gst_caps_get_structure (caps, 0);
487   value_array = gst_structure_get_value (structure, "streamheader");
488   if (value_array) {
489     g_list_free_full (souphttpsink->streamheader_buffers,
490         (GDestroyNotify) gst_buffer_unref);
491     souphttpsink->streamheader_buffers = NULL;
492 
493     n = gst_value_array_get_size (value_array);
494     for (i = 0; i < n; i++) {
495       const GValue *value;
496       GstBuffer *buffer;
497       value = gst_value_array_get_value (value_array, i);
498       buffer = GST_BUFFER (gst_value_get_buffer (value));
499       souphttpsink->streamheader_buffers =
500           g_list_append (souphttpsink->streamheader_buffers,
501           gst_buffer_ref (buffer));
502     }
503   }
504 
505   return TRUE;
506 }
507 
508 static void
gst_soup_http_client_sink_get_times(GstBaseSink * sink,GstBuffer * buffer,GstClockTime * start,GstClockTime * end)509 gst_soup_http_client_sink_get_times (GstBaseSink * sink, GstBuffer * buffer,
510     GstClockTime * start, GstClockTime * end)
511 {
512 
513 }
514 
515 static gboolean
thread_ready_idle_cb(gpointer data)516 thread_ready_idle_cb (gpointer data)
517 {
518   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (data);
519 
520   GST_LOG_OBJECT (souphttpsink, "thread ready");
521 
522   g_mutex_lock (&souphttpsink->mutex);
523   g_cond_signal (&souphttpsink->cond);
524   g_mutex_unlock (&souphttpsink->mutex);
525 
526   return FALSE;                 /* only run once */
527 }
528 
529 static gpointer
thread_func(gpointer ptr)530 thread_func (gpointer ptr)
531 {
532   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (ptr);
533 
534   GST_DEBUG ("thread start");
535 
536   g_main_loop_run (souphttpsink->loop);
537 
538   GST_DEBUG ("thread quit");
539 
540   return NULL;
541 }
542 
543 static gboolean
gst_soup_http_client_sink_start(GstBaseSink * sink)544 gst_soup_http_client_sink_start (GstBaseSink * sink)
545 {
546   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
547 
548   if (souphttpsink->prop_session) {
549     souphttpsink->session = souphttpsink->prop_session;
550   } else {
551     GSource *source;
552     GError *error = NULL;
553 
554     souphttpsink->context = g_main_context_new ();
555 
556     /* set up idle source to signal when the main loop is running and
557      * it's safe for ::stop() to call g_main_loop_quit() */
558     source = g_idle_source_new ();
559     g_source_set_callback (source, thread_ready_idle_cb, sink, NULL);
560     g_source_attach (source, souphttpsink->context);
561     g_source_unref (source);
562 
563     souphttpsink->loop = g_main_loop_new (souphttpsink->context, TRUE);
564 
565     g_mutex_lock (&souphttpsink->mutex);
566 
567     souphttpsink->thread = g_thread_try_new ("souphttpclientsink-thread",
568         thread_func, souphttpsink, &error);
569 
570     if (error != NULL) {
571       GST_DEBUG_OBJECT (souphttpsink, "failed to start thread, %s",
572           error->message);
573       g_error_free (error);
574       g_mutex_unlock (&souphttpsink->mutex);
575       return FALSE;
576     }
577 
578     GST_LOG_OBJECT (souphttpsink, "waiting for main loop thread to start up");
579     g_cond_wait (&souphttpsink->cond, &souphttpsink->mutex);
580     g_mutex_unlock (&souphttpsink->mutex);
581     GST_LOG_OBJECT (souphttpsink, "main loop thread running");
582 
583     if (souphttpsink->proxy == NULL) {
584       souphttpsink->session =
585           soup_session_async_new_with_options (SOUP_SESSION_ASYNC_CONTEXT,
586           souphttpsink->context, SOUP_SESSION_USER_AGENT,
587           souphttpsink->user_agent, SOUP_SESSION_TIMEOUT, souphttpsink->timeout,
588           SOUP_SESSION_ADD_FEATURE_BY_TYPE, SOUP_TYPE_PROXY_RESOLVER_DEFAULT,
589           NULL);
590     } else {
591       souphttpsink->session =
592           soup_session_async_new_with_options (SOUP_SESSION_ASYNC_CONTEXT,
593           souphttpsink->context, SOUP_SESSION_USER_AGENT,
594           souphttpsink->user_agent, SOUP_SESSION_TIMEOUT, souphttpsink->timeout,
595           SOUP_SESSION_PROXY_URI, souphttpsink->proxy, NULL);
596     }
597 
598     g_signal_connect (souphttpsink->session, "authenticate",
599         G_CALLBACK (authenticate), souphttpsink);
600   }
601 
602   /* Set up logging */
603   gst_soup_util_log_setup (souphttpsink->session, souphttpsink->log_level,
604       GST_ELEMENT (souphttpsink));
605 
606   return TRUE;
607 }
608 
609 static gboolean
gst_soup_http_client_sink_stop(GstBaseSink * sink)610 gst_soup_http_client_sink_stop (GstBaseSink * sink)
611 {
612   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
613 
614   GST_DEBUG ("stop");
615 
616   if (souphttpsink->prop_session == NULL) {
617     soup_session_abort (souphttpsink->session);
618     g_object_unref (souphttpsink->session);
619   }
620 
621   g_mutex_lock (&souphttpsink->mutex);
622   if (souphttpsink->timer) {
623     g_source_destroy (souphttpsink->timer);
624     g_source_unref (souphttpsink->timer);
625     souphttpsink->timer = NULL;
626   }
627   g_mutex_unlock (&souphttpsink->mutex);
628 
629   if (souphttpsink->loop) {
630     g_main_loop_quit (souphttpsink->loop);
631     g_mutex_lock (&souphttpsink->mutex);
632     g_cond_signal (&souphttpsink->cond);
633     g_mutex_unlock (&souphttpsink->mutex);
634     g_thread_join (souphttpsink->thread);
635     g_main_loop_unref (souphttpsink->loop);
636     souphttpsink->loop = NULL;
637   }
638   if (souphttpsink->context) {
639     g_main_context_unref (souphttpsink->context);
640     souphttpsink->context = NULL;
641   }
642 
643   gst_soup_http_client_sink_reset (souphttpsink);
644 
645   return TRUE;
646 }
647 
648 static gboolean
gst_soup_http_client_sink_unlock(GstBaseSink * sink)649 gst_soup_http_client_sink_unlock (GstBaseSink * sink)
650 {
651   GST_DEBUG ("unlock");
652 
653   return TRUE;
654 }
655 
656 static gboolean
gst_soup_http_client_sink_event(GstBaseSink * sink,GstEvent * event)657 gst_soup_http_client_sink_event (GstBaseSink * sink, GstEvent * event)
658 {
659   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
660 
661   GST_DEBUG_OBJECT (souphttpsink, "event");
662 
663   if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
664     GST_DEBUG_OBJECT (souphttpsink, "got eos");
665     g_mutex_lock (&souphttpsink->mutex);
666     while (souphttpsink->message) {
667       GST_DEBUG_OBJECT (souphttpsink, "waiting");
668       g_cond_wait (&souphttpsink->cond, &souphttpsink->mutex);
669     }
670     g_mutex_unlock (&souphttpsink->mutex);
671     GST_DEBUG_OBJECT (souphttpsink, "finished eos");
672   }
673 
674   return GST_BASE_SINK_CLASS (parent_class)->event (sink, event);
675 }
676 
677 static GstFlowReturn
gst_soup_http_client_sink_preroll(GstBaseSink * sink,GstBuffer * buffer)678 gst_soup_http_client_sink_preroll (GstBaseSink * sink, GstBuffer * buffer)
679 {
680   GST_DEBUG ("preroll");
681 
682   return GST_FLOW_OK;
683 }
684 
685 static void
send_message_locked(GstSoupHttpClientSink * souphttpsink)686 send_message_locked (GstSoupHttpClientSink * souphttpsink)
687 {
688   GList *g;
689   guint64 n;
690 
691   if (souphttpsink->queued_buffers == NULL || souphttpsink->message) {
692     return;
693   }
694 
695   /* If the URI went away, drop all these buffers */
696   if (souphttpsink->location == NULL) {
697     GST_DEBUG_OBJECT (souphttpsink, "URI went away, dropping queued buffers");
698     g_list_free_full (souphttpsink->queued_buffers,
699         (GDestroyNotify) gst_buffer_unref);
700     souphttpsink->queued_buffers = NULL;
701     return;
702   }
703 
704   souphttpsink->message = soup_message_new ("PUT", souphttpsink->location);
705   if (souphttpsink->message == NULL) {
706     GST_WARNING_OBJECT (souphttpsink,
707         "URI could not be parsed while creating message.");
708     g_list_free_full (souphttpsink->queued_buffers,
709         (GDestroyNotify) gst_buffer_unref);
710     souphttpsink->queued_buffers = NULL;
711     return;
712   }
713 
714   soup_message_set_flags (souphttpsink->message,
715       (souphttpsink->automatic_redirect ? 0 : SOUP_MESSAGE_NO_REDIRECT));
716 
717   if (souphttpsink->cookies) {
718     gchar **cookie;
719 
720     for (cookie = souphttpsink->cookies; *cookie != NULL; cookie++) {
721       soup_message_headers_append (souphttpsink->message->request_headers,
722           "Cookie", *cookie);
723     }
724   }
725 
726   n = 0;
727   if (souphttpsink->offset == 0) {
728     for (g = souphttpsink->streamheader_buffers; g; g = g_list_next (g)) {
729       GstBuffer *buffer = g->data;
730       GstMapInfo map;
731 
732       GST_DEBUG_OBJECT (souphttpsink, "queueing stream headers");
733       gst_buffer_map (buffer, &map, GST_MAP_READ);
734       /* Stream headers are updated whenever ::set_caps is called, so there's
735        * no guarantees about their lifetime and we ask libsoup to copy them
736        * into the message body with SOUP_MEMORY_COPY. */
737       soup_message_body_append (souphttpsink->message->request_body,
738           SOUP_MEMORY_COPY, map.data, map.size);
739       n += map.size;
740       gst_buffer_unmap (buffer, &map);
741     }
742   }
743 
744   for (g = souphttpsink->queued_buffers; g; g = g_list_next (g)) {
745     GstBuffer *buffer = g->data;
746     if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_HEADER)) {
747       GstMapInfo map;
748 
749       gst_buffer_map (buffer, &map, GST_MAP_READ);
750       /* Queued buffers are only freed in the next iteration of the mainloop
751        * after the message body has been written out, so we don't need libsoup
752        * to copy those while appending to the body. However, if the buffer is
753        * used elsewhere, it should be copied. Hence, SOUP_MEMORY_TEMPORARY. */
754       soup_message_body_append (souphttpsink->message->request_body,
755           SOUP_MEMORY_TEMPORARY, map.data, map.size);
756       n += map.size;
757       gst_buffer_unmap (buffer, &map);
758     }
759   }
760 
761   if (souphttpsink->offset != 0) {
762     char *s;
763     s = g_strdup_printf ("bytes %" G_GUINT64_FORMAT "-%" G_GUINT64_FORMAT "/*",
764         souphttpsink->offset, souphttpsink->offset + n - 1);
765     soup_message_headers_append (souphttpsink->message->request_headers,
766         "Content-Range", s);
767     g_free (s);
768   }
769 
770   if (n == 0) {
771     GST_DEBUG_OBJECT (souphttpsink,
772         "total size of buffers queued is 0, freeing everything");
773     g_list_free_full (souphttpsink->queued_buffers,
774         (GDestroyNotify) gst_buffer_unref);
775     souphttpsink->queued_buffers = NULL;
776     g_object_unref (souphttpsink->message);
777     souphttpsink->message = NULL;
778     return;
779   }
780 
781   souphttpsink->sent_buffers = souphttpsink->queued_buffers;
782   souphttpsink->queued_buffers = NULL;
783 
784   GST_DEBUG_OBJECT (souphttpsink,
785       "queue message %" G_GUINT64_FORMAT " %" G_GUINT64_FORMAT,
786       souphttpsink->offset, n);
787   soup_session_queue_message (souphttpsink->session, souphttpsink->message,
788       callback, souphttpsink);
789 
790   souphttpsink->offset += n;
791 }
792 
793 static gboolean
send_message(GstSoupHttpClientSink * souphttpsink)794 send_message (GstSoupHttpClientSink * souphttpsink)
795 {
796   g_mutex_lock (&souphttpsink->mutex);
797   send_message_locked (souphttpsink);
798   if (souphttpsink->timer) {
799     g_source_destroy (souphttpsink->timer);
800     g_source_unref (souphttpsink->timer);
801     souphttpsink->timer = NULL;
802   }
803   g_mutex_unlock (&souphttpsink->mutex);
804 
805   return FALSE;
806 }
807 
808 static void
callback(SoupSession * session,SoupMessage * msg,gpointer user_data)809 callback (SoupSession * session, SoupMessage * msg, gpointer user_data)
810 {
811   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (user_data);
812 
813   GST_DEBUG_OBJECT (souphttpsink, "callback status=%d %s",
814       msg->status_code, msg->reason_phrase);
815 
816   g_mutex_lock (&souphttpsink->mutex);
817   g_cond_signal (&souphttpsink->cond);
818   souphttpsink->message = NULL;
819 
820   if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code)) {
821     souphttpsink->failures++;
822     if (souphttpsink->retries &&
823         (souphttpsink->retries < 0 ||
824             souphttpsink->retries >= souphttpsink->failures)) {
825       guint64 retry_delay;
826       const char *retry_after =
827           soup_message_headers_get_one (msg->response_headers,
828           "Retry-After");
829       if (retry_after) {
830         gchar *end = NULL;
831         retry_delay = g_ascii_strtoull (retry_after, &end, 10);
832         if (end || errno) {
833           retry_delay = souphttpsink->retry_delay;
834         } else {
835           retry_delay = MAX (retry_delay, souphttpsink->retry_delay);
836         }
837         GST_WARNING_OBJECT (souphttpsink, "Could not write to HTTP URI: "
838             "status: %d %s (retrying PUT after %" G_GINT64_FORMAT
839             " seconds with Retry-After: %s)", msg->status_code,
840             msg->reason_phrase, retry_delay, retry_after);
841       } else {
842         retry_delay = souphttpsink->retry_delay;
843         GST_WARNING_OBJECT (souphttpsink, "Could not write to HTTP URI: "
844             "status: %d %s (retrying PUT after %" G_GINT64_FORMAT
845             " seconds)", msg->status_code, msg->reason_phrase, retry_delay);
846       }
847       souphttpsink->timer = g_timeout_source_new_seconds (retry_delay);
848       g_source_set_callback (souphttpsink->timer, (GSourceFunc) (send_message),
849           souphttpsink, NULL);
850       g_source_attach (souphttpsink->timer, souphttpsink->context);
851     } else {
852       souphttpsink->status_code = msg->status_code;
853       souphttpsink->reason_phrase = g_strdup (msg->reason_phrase);
854     }
855     g_mutex_unlock (&souphttpsink->mutex);
856     return;
857   }
858 
859   g_list_free_full (souphttpsink->sent_buffers,
860       (GDestroyNotify) gst_buffer_unref);
861   souphttpsink->sent_buffers = NULL;
862   souphttpsink->failures = 0;
863 
864   send_message_locked (souphttpsink);
865   g_mutex_unlock (&souphttpsink->mutex);
866 }
867 
868 static GstFlowReturn
gst_soup_http_client_sink_render(GstBaseSink * sink,GstBuffer * buffer)869 gst_soup_http_client_sink_render (GstBaseSink * sink, GstBuffer * buffer)
870 {
871   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (sink);
872   GSource *source;
873   gboolean wake;
874 
875   if (souphttpsink->status_code != 0) {
876     GST_ELEMENT_ERROR (souphttpsink, RESOURCE, WRITE,
877         ("Could not write to HTTP URI"),
878         ("status: %d %s", souphttpsink->status_code,
879             souphttpsink->reason_phrase));
880     return GST_FLOW_ERROR;
881   }
882 
883   g_mutex_lock (&souphttpsink->mutex);
884   if (souphttpsink->location != NULL) {
885     wake = (souphttpsink->queued_buffers == NULL);
886     souphttpsink->queued_buffers =
887         g_list_append (souphttpsink->queued_buffers, gst_buffer_ref (buffer));
888 
889     if (wake) {
890       GST_DEBUG_OBJECT (souphttpsink, "setting callback for new buffers");
891       source = g_idle_source_new ();
892       g_source_set_callback (source, (GSourceFunc) (send_message),
893           souphttpsink, NULL);
894       g_source_attach (source, souphttpsink->context);
895       g_source_unref (source);
896     }
897   }
898   g_mutex_unlock (&souphttpsink->mutex);
899 
900   return GST_FLOW_OK;
901 }
902 
903 static void
authenticate(SoupSession * session,SoupMessage * msg,SoupAuth * auth,gboolean retrying,gpointer user_data)904 authenticate (SoupSession * session, SoupMessage * msg,
905     SoupAuth * auth, gboolean retrying, gpointer user_data)
906 {
907   GstSoupHttpClientSink *souphttpsink = GST_SOUP_HTTP_CLIENT_SINK (user_data);
908 
909   if (!retrying) {
910     /* First time authentication only, if we fail and are called again with retry true fall through */
911     if (msg->status_code == SOUP_STATUS_UNAUTHORIZED) {
912       if (souphttpsink->user_id && souphttpsink->user_pw)
913         soup_auth_authenticate (auth, souphttpsink->user_id,
914             souphttpsink->user_pw);
915     } else if (msg->status_code == SOUP_STATUS_PROXY_AUTHENTICATION_REQUIRED) {
916       if (souphttpsink->proxy_id && souphttpsink->proxy_pw)
917         soup_auth_authenticate (auth, souphttpsink->proxy_id,
918             souphttpsink->proxy_pw);
919     }
920   }
921 }
922