1 /* GStreamer
2  * Copyright (C) 2011 Axis Communications <dev-gstreamer@axis.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 
20 /**
21  * SECTION:element-curlsink
22  * @title: curlsink
23  * @short_description: sink that uploads data to a server using libcurl
24  * @see_also:
25  *
26  * This is a network sink that uses libcurl as a client to upload data to
27  * a server (e.g. a HTTP/FTP server).
28  *
29  * ## Example launch line (upload a JPEG file to an HTTP server)
30  * |[
31  * gst-launch-1.0 filesrc location=image.jpg ! jpegparse ! curlsink  \
32  *     file-name=image.jpg  \
33  *     location=http://192.168.0.1:8080/cgi-bin/patupload.cgi/  \
34  *     user=test passwd=test  \
35  *     content-type=image/jpeg  \
36  *     use-content-length=false
37  * ]|
38  *
39  */
40 
41 #ifdef HAVE_CONFIG_H
42 #include "config.h"
43 #endif
44 
45 #include <curl/curl.h>
46 #include <string.h>
47 #include <stdio.h>
48 
49 #if HAVE_SYS_SOCKET_H
50 #include <sys/socket.h>
51 #endif
52 #include <sys/types.h>
53 #if HAVE_NETINET_IN_H
54 #include <netinet/in.h>
55 #endif
56 #include <unistd.h>
57 #if HAVE_NETINET_IP_H
58 #include <netinet/ip.h>
59 #endif
60 #if HAVE_NETINET_TCP_H
61 #include <netinet/tcp.h>
62 #endif
63 #include <sys/stat.h>
64 #include <fcntl.h>
65 
66 #include "gstcurlbasesink.h"
67 
68 /* Default values */
69 #define GST_CAT_DEFAULT                gst_curl_base_sink_debug
70 #define DEFAULT_URL                    "localhost:5555"
71 #define DEFAULT_TIMEOUT                30
72 #define DEFAULT_QOS_DSCP               0
73 
74 #define DSCP_MIN                       0
75 #define DSCP_MAX                       63
76 
77 
78 /* Plugin specific settings */
79 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
80     GST_PAD_SINK,
81     GST_PAD_ALWAYS,
82     GST_STATIC_CAPS_ANY);
83 
84 GST_DEBUG_CATEGORY_STATIC (gst_curl_base_sink_debug);
85 
86 enum
87 {
88   PROP_0,
89   PROP_LOCATION,
90   PROP_USER_NAME,
91   PROP_USER_PASSWD,
92   PROP_FILE_NAME,
93   PROP_TIMEOUT,
94   PROP_QOS_DSCP
95 };
96 
97 /* Object class function declarations */
98 static void gst_curl_base_sink_finalize (GObject * gobject);
99 static void gst_curl_base_sink_set_property (GObject * object, guint prop_id,
100     const GValue * value, GParamSpec * pspec);
101 static void gst_curl_base_sink_get_property (GObject * object, guint prop_id,
102     GValue * value, GParamSpec * pspec);
103 
104 /* BaseSink class function declarations */
105 static GstFlowReturn gst_curl_base_sink_render (GstBaseSink * bsink,
106     GstBuffer * buf);
107 static gboolean gst_curl_base_sink_event (GstBaseSink * bsink,
108     GstEvent * event);
109 static gboolean gst_curl_base_sink_start (GstBaseSink * bsink);
110 static gboolean gst_curl_base_sink_stop (GstBaseSink * bsink);
111 static gboolean gst_curl_base_sink_unlock (GstBaseSink * bsink);
112 static gboolean gst_curl_base_sink_unlock_stop (GstBaseSink * bsink);
113 
114 /* private functions */
115 
116 static gboolean gst_curl_base_sink_transfer_setup_unlocked
117     (GstCurlBaseSink * sink);
118 static gboolean gst_curl_base_sink_transfer_start_unlocked
119     (GstCurlBaseSink * sink);
120 static void gst_curl_base_sink_transfer_cleanup (GstCurlBaseSink * sink);
121 static size_t gst_curl_base_sink_transfer_read_cb (void *ptr, size_t size,
122     size_t nmemb, void *stream);
123 static size_t gst_curl_base_sink_transfer_write_cb (void *ptr, size_t size,
124     size_t nmemb, void *stream);
125 static size_t gst_curl_base_sink_transfer_data_buffer (GstCurlBaseSink * sink,
126     void *curl_ptr, size_t block_size, guint * last_chunk);
127 #ifndef GST_DISABLE_GST_DEBUG
128 static int gst_curl_base_sink_debug_cb (CURL * handle, curl_infotype type,
129     char *data, size_t size, void *clientp);
130 #endif
131 static int gst_curl_base_sink_transfer_socket_cb (void *clientp,
132     curl_socket_t curlfd, curlsocktype purpose);
133 static gpointer gst_curl_base_sink_transfer_thread_func (gpointer data);
134 static gint gst_curl_base_sink_setup_dscp_unlocked (GstCurlBaseSink * sink);
135 static CURLcode gst_curl_base_sink_transfer_check (GstCurlBaseSink * sink);
136 
137 static gboolean gst_curl_base_sink_wait_for_data_unlocked
138     (GstCurlBaseSink * sink);
139 static void gst_curl_base_sink_new_file_notify_unlocked
140     (GstCurlBaseSink * sink);
141 static void gst_curl_base_sink_wait_for_transfer_thread_to_send_unlocked
142     (GstCurlBaseSink * sink);
143 static void gst_curl_base_sink_data_sent_notify (GstCurlBaseSink * sink);
144 static void gst_curl_base_sink_wait_for_response (GstCurlBaseSink * sink);
145 static void gst_curl_base_sink_got_response_notify (GstCurlBaseSink * sink);
146 
147 static void handle_transfer (GstCurlBaseSink * sink);
148 static size_t transfer_data_buffer (void *curl_ptr, TransferBuffer * buf,
149     size_t max_bytes_to_send, guint * last_chunk);
150 
151 #define parent_class gst_curl_base_sink_parent_class
152 G_DEFINE_TYPE (GstCurlBaseSink, gst_curl_base_sink, GST_TYPE_BASE_SINK);
153 
154 static gboolean
gst_curl_base_sink_default_has_buffered_data_unlocked(GstCurlBaseSink * sink)155 gst_curl_base_sink_default_has_buffered_data_unlocked (GstCurlBaseSink * sink)
156 {
157   return sink->transfer_buf->len > 0;
158 }
159 
160 static gboolean
gst_curl_base_sink_has_buffered_data_unlocked(GstCurlBaseSink * sink)161 gst_curl_base_sink_has_buffered_data_unlocked (GstCurlBaseSink * sink)
162 {
163   GstCurlBaseSinkClass *klass;
164   gboolean res = FALSE;
165 
166   klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
167 
168   if (klass->has_buffered_data_unlocked)
169     res = klass->has_buffered_data_unlocked (sink);
170 
171   return res;
172 }
173 
174 static void
gst_curl_base_sink_class_init(GstCurlBaseSinkClass * klass)175 gst_curl_base_sink_class_init (GstCurlBaseSinkClass * klass)
176 {
177   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
178   GstBaseSinkClass *gstbasesink_class = (GstBaseSinkClass *) klass;
179   GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
180 
181   GST_DEBUG_CATEGORY_INIT (gst_curl_base_sink_debug, "curlbasesink", 0,
182       "curl base sink element");
183   GST_DEBUG_OBJECT (klass, "class_init");
184 
185   gst_element_class_set_static_metadata (element_class,
186       "Curl base sink",
187       "Sink/Network",
188       "Upload data over the network to a server using libcurl",
189       "Patricia Muscalu <patricia@axis.com>");
190 
191   gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_curl_base_sink_event);
192   gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_curl_base_sink_render);
193   gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_curl_base_sink_start);
194   gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_curl_base_sink_stop);
195   gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_curl_base_sink_unlock);
196   gstbasesink_class->unlock_stop =
197       GST_DEBUG_FUNCPTR (gst_curl_base_sink_unlock_stop);
198   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_curl_base_sink_finalize);
199 
200   gobject_class->set_property = gst_curl_base_sink_set_property;
201   gobject_class->get_property = gst_curl_base_sink_get_property;
202 
203   klass->handle_transfer = handle_transfer;
204   klass->transfer_read_cb = gst_curl_base_sink_transfer_read_cb;
205   klass->transfer_data_buffer = gst_curl_base_sink_transfer_data_buffer;
206   klass->has_buffered_data_unlocked =
207       gst_curl_base_sink_default_has_buffered_data_unlocked;
208 
209   /* FIXME: check against souphttpsrc and use same names for same properties */
210   g_object_class_install_property (gobject_class, PROP_LOCATION,
211       g_param_spec_string ("location", "Location",
212           "URI location to write to", NULL,
213           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
214   g_object_class_install_property (gobject_class, PROP_USER_NAME,
215       g_param_spec_string ("user", "User name",
216           "User name to use for server authentication", NULL,
217           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
218   g_object_class_install_property (gobject_class, PROP_USER_PASSWD,
219       g_param_spec_string ("passwd", "User password",
220           "User password to use for server authentication", NULL,
221           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
222   g_object_class_install_property (gobject_class, PROP_FILE_NAME,
223       g_param_spec_string ("file-name", "Base file name",
224           "The base file name for the uploaded images", NULL,
225           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
226   g_object_class_install_property (gobject_class, PROP_TIMEOUT,
227       g_param_spec_int ("timeout", "Timeout",
228           "Number of seconds waiting to write before timeout",
229           0, G_MAXINT, DEFAULT_TIMEOUT,
230           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
231   g_object_class_install_property (gobject_class, PROP_QOS_DSCP,
232       g_param_spec_int ("qos-dscp",
233           "QoS diff srv code point",
234           "Quality of Service, differentiated services code point (0 default)",
235           DSCP_MIN, DSCP_MAX, DEFAULT_QOS_DSCP,
236           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
237 
238   gst_element_class_add_static_pad_template (element_class, &sinktemplate);
239 }
240 
241 static void
gst_curl_base_sink_init(GstCurlBaseSink * sink)242 gst_curl_base_sink_init (GstCurlBaseSink * sink)
243 {
244   sink->transfer_buf = g_malloc (sizeof (TransferBuffer));
245   sink->transfer_cond = g_malloc (sizeof (TransferCondition));
246   g_cond_init (&sink->transfer_cond->cond);
247   sink->transfer_cond->data_sent = FALSE;
248   sink->transfer_cond->data_available = FALSE;
249   sink->transfer_cond->wait_for_response = FALSE;
250   sink->timeout = DEFAULT_TIMEOUT;
251   sink->qos_dscp = DEFAULT_QOS_DSCP;
252   sink->url = g_strdup (DEFAULT_URL);
253   sink->transfer_thread_close = FALSE;
254   sink->new_file = TRUE;
255   sink->error = NULL;
256   sink->flow_ret = GST_FLOW_OK;
257   sink->is_live = FALSE;
258 }
259 
260 static void
gst_curl_base_sink_finalize(GObject * gobject)261 gst_curl_base_sink_finalize (GObject * gobject)
262 {
263   GstCurlBaseSink *this = GST_CURL_BASE_SINK (gobject);
264 
265   GST_DEBUG ("finalizing curlsink");
266   if (this->transfer_thread != NULL) {
267     g_thread_join (this->transfer_thread);
268   }
269 
270   g_cond_clear (&this->transfer_cond->cond);
271   g_free (this->transfer_cond);
272   g_free (this->transfer_buf);
273 
274   g_free (this->url);
275   g_free (this->user);
276   g_free (this->passwd);
277   g_free (this->file_name);
278   if (this->fdset != NULL) {
279     gst_poll_free (this->fdset);
280     this->fdset = NULL;
281   }
282   G_OBJECT_CLASS (parent_class)->finalize (gobject);
283 }
284 
285 void
gst_curl_base_sink_transfer_thread_notify_unlocked(GstCurlBaseSink * sink)286 gst_curl_base_sink_transfer_thread_notify_unlocked (GstCurlBaseSink * sink)
287 {
288   GST_LOG ("more data to send");
289 
290   sink->transfer_cond->data_available = TRUE;
291   sink->transfer_cond->data_sent = FALSE;
292   sink->transfer_cond->wait_for_response = TRUE;
293   g_cond_signal (&sink->transfer_cond->cond);
294 }
295 
296 void
gst_curl_base_sink_transfer_thread_close(GstCurlBaseSink * sink)297 gst_curl_base_sink_transfer_thread_close (GstCurlBaseSink * sink)
298 {
299   GST_OBJECT_LOCK (sink);
300   GST_LOG_OBJECT (sink, "setting transfer thread close flag");
301   sink->transfer_thread_close = TRUE;
302   g_cond_signal (&sink->transfer_cond->cond);
303   GST_OBJECT_UNLOCK (sink);
304 
305   if (sink->transfer_thread != NULL) {
306     GST_LOG_OBJECT (sink, "waiting for transfer thread to finish");
307     g_thread_join (sink->transfer_thread);
308     sink->transfer_thread = NULL;
309   }
310 }
311 
312 void
gst_curl_base_sink_set_live(GstCurlBaseSink * sink,gboolean live)313 gst_curl_base_sink_set_live (GstCurlBaseSink * sink, gboolean live)
314 {
315   g_return_if_fail (GST_IS_CURL_BASE_SINK (sink));
316 
317   GST_OBJECT_LOCK (sink);
318   sink->is_live = live;
319   GST_OBJECT_UNLOCK (sink);
320 }
321 
322 gboolean
gst_curl_base_sink_is_live(GstCurlBaseSink * sink)323 gst_curl_base_sink_is_live (GstCurlBaseSink * sink)
324 {
325   gboolean result;
326 
327   g_return_val_if_fail (GST_IS_CURL_BASE_SINK (sink), FALSE);
328 
329   GST_OBJECT_LOCK (sink);
330   result = sink->is_live;
331   GST_OBJECT_UNLOCK (sink);
332 
333   return result;
334 }
335 
336 static GstFlowReturn
gst_curl_base_sink_render(GstBaseSink * bsink,GstBuffer * buf)337 gst_curl_base_sink_render (GstBaseSink * bsink, GstBuffer * buf)
338 {
339   GstCurlBaseSink *sink;
340   GstMapInfo map;
341   guint8 *data;
342   size_t size;
343   GstFlowReturn ret;
344   gchar *error;
345 
346   GST_LOG ("enter render");
347 
348   sink = GST_CURL_BASE_SINK (bsink);
349 
350   gst_buffer_map (buf, &map, GST_MAP_READ);
351   data = map.data;
352   size = map.size;
353 
354   if (size == 0) {
355     gst_buffer_unmap (buf, &map);
356     return GST_FLOW_OK;
357   }
358 
359   GST_OBJECT_LOCK (sink);
360 
361   /* check if the transfer thread has encountered problems while the
362    * pipeline thread was working elsewhere */
363   if (sink->flow_ret != GST_FLOW_OK) {
364     goto done;
365   }
366 
367   g_assert (sink->transfer_cond->data_available == FALSE);
368 
369   /* if there is no transfer thread created, lets create one */
370   if (sink->transfer_thread == NULL) {
371     if (!gst_curl_base_sink_transfer_start_unlocked (sink)) {
372       sink->flow_ret = GST_FLOW_ERROR;
373       goto done;
374     }
375   }
376 
377   /* make data available for the transfer thread and notify */
378   sink->transfer_buf->ptr = data;
379   sink->transfer_buf->len = size;
380   sink->transfer_buf->offset = 0;
381   gst_curl_base_sink_transfer_thread_notify_unlocked (sink);
382 
383   /* wait for the transfer thread to send the data. This will be notified
384    * either when transfer is completed by the curl read callback or by
385    * the thread function if an error has occurred. */
386   gst_curl_base_sink_wait_for_transfer_thread_to_send_unlocked (sink);
387 
388 done:
389   gst_buffer_unmap (buf, &map);
390 
391   /* Hand over error from transfer thread to streaming thread */
392   error = sink->error;
393   sink->error = NULL;
394   ret = sink->flow_ret;
395   GST_OBJECT_UNLOCK (sink);
396 
397   if (error != NULL) {
398     GST_ERROR_OBJECT (sink, "%s", error);
399     GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("%s", error), (NULL));
400     g_free (error);
401   }
402 
403   GST_LOG ("exit render");
404 
405   return ret;
406 }
407 
408 static gboolean
gst_curl_base_sink_event(GstBaseSink * bsink,GstEvent * event)409 gst_curl_base_sink_event (GstBaseSink * bsink, GstEvent * event)
410 {
411   GstCurlBaseSink *sink = GST_CURL_BASE_SINK (bsink);
412   GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
413 
414   switch (event->type) {
415     case GST_EVENT_EOS:
416       GST_DEBUG_OBJECT (sink, "received EOS");
417       gst_curl_base_sink_transfer_thread_close (sink);
418       gst_curl_base_sink_wait_for_response (sink);
419       break;
420     case GST_EVENT_CAPS:
421       if (klass->set_mime_type) {
422         GstCaps *caps;
423         gst_event_parse_caps (event, &caps);
424         klass->set_mime_type (sink, caps);
425       }
426       break;
427     default:
428       break;
429   }
430 
431   return GST_BASE_SINK_CLASS (parent_class)->event (bsink, event);
432 }
433 
434 static gboolean
gst_curl_base_sink_start(GstBaseSink * bsink)435 gst_curl_base_sink_start (GstBaseSink * bsink)
436 {
437   GstCurlBaseSink *sink;
438 
439   sink = GST_CURL_BASE_SINK (bsink);
440 
441   /* reset flags */
442   sink->transfer_cond->data_sent = FALSE;
443   sink->transfer_cond->data_available = FALSE;
444   sink->transfer_cond->wait_for_response = FALSE;
445   sink->transfer_thread_close = FALSE;
446   sink->new_file = TRUE;
447   sink->flow_ret = GST_FLOW_OK;
448 
449   if ((sink->fdset = gst_poll_new (TRUE)) == NULL) {
450     GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_READ_WRITE,
451         ("gst_poll_new failed: %s", g_strerror (errno)), (NULL));
452     return FALSE;
453   }
454 
455   gst_poll_fd_init (&sink->fd);
456 
457   return TRUE;
458 }
459 
460 static gboolean
gst_curl_base_sink_stop(GstBaseSink * bsink)461 gst_curl_base_sink_stop (GstBaseSink * bsink)
462 {
463   GstCurlBaseSink *sink = GST_CURL_BASE_SINK (bsink);
464 
465   gst_curl_base_sink_transfer_thread_close (sink);
466   if (sink->fdset != NULL) {
467     gst_poll_free (sink->fdset);
468     sink->fdset = NULL;
469   }
470 
471   return TRUE;
472 }
473 
474 static gboolean
gst_curl_base_sink_unlock(GstBaseSink * bsink)475 gst_curl_base_sink_unlock (GstBaseSink * bsink)
476 {
477   GstCurlBaseSink *sink;
478 
479   sink = GST_CURL_BASE_SINK (bsink);
480 
481   GST_LOG_OBJECT (sink, "Flushing");
482   gst_poll_set_flushing (sink->fdset, TRUE);
483 
484   return TRUE;
485 }
486 
487 static gboolean
gst_curl_base_sink_unlock_stop(GstBaseSink * bsink)488 gst_curl_base_sink_unlock_stop (GstBaseSink * bsink)
489 {
490   GstCurlBaseSink *sink;
491 
492   sink = GST_CURL_BASE_SINK (bsink);
493 
494   GST_LOG_OBJECT (sink, "No longer flushing");
495   gst_poll_set_flushing (sink->fdset, FALSE);
496 
497   return TRUE;
498 }
499 
500 static void
gst_curl_base_sink_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)501 gst_curl_base_sink_set_property (GObject * object, guint prop_id,
502     const GValue * value, GParamSpec * pspec)
503 {
504   GstCurlBaseSink *sink;
505   GstState cur_state;
506 
507   g_return_if_fail (GST_IS_CURL_BASE_SINK (object));
508   sink = GST_CURL_BASE_SINK (object);
509 
510   gst_element_get_state (GST_ELEMENT (sink), &cur_state, NULL, 0);
511   if (cur_state != GST_STATE_PLAYING && cur_state != GST_STATE_PAUSED) {
512     GST_OBJECT_LOCK (sink);
513 
514     switch (prop_id) {
515       case PROP_LOCATION:
516         g_free (sink->url);
517         sink->url = g_value_dup_string (value);
518         GST_DEBUG_OBJECT (sink, "url set to %s", sink->url);
519         break;
520       case PROP_USER_NAME:
521         g_free (sink->user);
522         sink->user = g_value_dup_string (value);
523         GST_DEBUG_OBJECT (sink, "user set to %s", sink->user);
524         break;
525       case PROP_USER_PASSWD:
526         g_free (sink->passwd);
527         sink->passwd = g_value_dup_string (value);
528         GST_DEBUG_OBJECT (sink, "passwd set to %s", sink->passwd);
529         break;
530       case PROP_FILE_NAME:
531         g_free (sink->file_name);
532         sink->file_name = g_value_dup_string (value);
533         GST_DEBUG_OBJECT (sink, "file_name set to %s", sink->file_name);
534         break;
535       case PROP_TIMEOUT:
536         sink->timeout = g_value_get_int (value);
537         GST_DEBUG_OBJECT (sink, "timeout set to %d", sink->timeout);
538         break;
539       case PROP_QOS_DSCP:
540         sink->qos_dscp = g_value_get_int (value);
541         gst_curl_base_sink_setup_dscp_unlocked (sink);
542         GST_DEBUG_OBJECT (sink, "dscp set to %d", sink->qos_dscp);
543         break;
544       default:
545         GST_DEBUG_OBJECT (sink, "invalid property id %d", prop_id);
546         break;
547     }
548 
549     GST_OBJECT_UNLOCK (sink);
550 
551     return;
552   }
553 
554   /* in PLAYING or PAUSED state */
555   GST_OBJECT_LOCK (sink);
556 
557   switch (prop_id) {
558     case PROP_FILE_NAME:
559       g_free (sink->file_name);
560       sink->file_name = g_value_dup_string (value);
561       GST_DEBUG_OBJECT (sink, "file_name set to %s", sink->file_name);
562       gst_curl_base_sink_new_file_notify_unlocked (sink);
563       break;
564     case PROP_TIMEOUT:
565       sink->timeout = g_value_get_int (value);
566       GST_DEBUG_OBJECT (sink, "timeout set to %d", sink->timeout);
567       break;
568     case PROP_QOS_DSCP:
569       sink->qos_dscp = g_value_get_int (value);
570       gst_curl_base_sink_setup_dscp_unlocked (sink);
571       GST_DEBUG_OBJECT (sink, "dscp set to %d", sink->qos_dscp);
572       break;
573     default:
574       GST_WARNING_OBJECT (sink, "cannot set property when PLAYING");
575       break;
576   }
577 
578   GST_OBJECT_UNLOCK (sink);
579 }
580 
581 static void
gst_curl_base_sink_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)582 gst_curl_base_sink_get_property (GObject * object, guint prop_id,
583     GValue * value, GParamSpec * pspec)
584 {
585   GstCurlBaseSink *sink;
586 
587   g_return_if_fail (GST_IS_CURL_BASE_SINK (object));
588   sink = GST_CURL_BASE_SINK (object);
589 
590   switch (prop_id) {
591     case PROP_LOCATION:
592       g_value_set_string (value, sink->url);
593       break;
594     case PROP_USER_NAME:
595       g_value_set_string (value, sink->user);
596       break;
597     case PROP_USER_PASSWD:
598       g_value_set_string (value, sink->passwd);
599       break;
600     case PROP_FILE_NAME:
601       g_value_set_string (value, sink->file_name);
602       break;
603     case PROP_TIMEOUT:
604       g_value_set_int (value, sink->timeout);
605       break;
606     case PROP_QOS_DSCP:
607       g_value_set_int (value, sink->qos_dscp);
608       break;
609     default:
610       GST_DEBUG_OBJECT (sink, "invalid property id");
611       break;
612   }
613 }
614 
615 static gboolean
gst_curl_base_sink_transfer_set_common_options_unlocked(GstCurlBaseSink * sink)616 gst_curl_base_sink_transfer_set_common_options_unlocked (GstCurlBaseSink * sink)
617 {
618   GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
619   CURLcode res;
620 
621 #ifndef GST_DISABLE_GST_DEBUG
622   res = curl_easy_setopt (sink->curl, CURLOPT_VERBOSE, 1);
623   if (res != CURLE_OK) {
624     sink->error = g_strdup_printf ("failed to set verbose: %s",
625         curl_easy_strerror (res));
626     return FALSE;
627   }
628   res = curl_easy_setopt (sink->curl, CURLOPT_DEBUGDATA, sink);
629   if (res != CURLE_OK) {
630     sink->error = g_strdup_printf ("failed to set debug user_data: %s",
631         curl_easy_strerror (res));
632     return FALSE;
633   }
634   res = curl_easy_setopt (sink->curl, CURLOPT_DEBUGFUNCTION,
635       gst_curl_base_sink_debug_cb);
636   if (res != CURLE_OK) {
637     sink->error = g_strdup_printf ("failed to set debug functions: %s",
638         curl_easy_strerror (res));
639     return FALSE;
640   }
641 #endif
642 
643   res = curl_easy_setopt (sink->curl, CURLOPT_URL, sink->url);
644   if (res != CURLE_OK) {
645     sink->error = g_strdup_printf ("failed to set URL: %s",
646         curl_easy_strerror (res));
647     return FALSE;
648   }
649 
650   res = curl_easy_setopt (sink->curl, CURLOPT_CONNECTTIMEOUT, sink->timeout);
651   if (res != CURLE_OK) {
652     sink->error = g_strdup_printf ("failed to set connection timeout: %s",
653         curl_easy_strerror (res));
654     return FALSE;
655   }
656 
657   /* using signals in a multi-threaded application is dangerous */
658   res = curl_easy_setopt (sink->curl, CURLOPT_NOSIGNAL, 1);
659   if (res != CURLE_OK) {
660     sink->error = g_strdup_printf ("failed to set no signalling: %s",
661         curl_easy_strerror (res));
662     return FALSE;
663   }
664 
665   /* socket settings */
666   res = curl_easy_setopt (sink->curl, CURLOPT_SOCKOPTDATA, sink);
667   if (res != CURLE_OK) {
668     sink->error = g_strdup_printf ("failed to set sockopt user data: %s",
669         curl_easy_strerror (res));
670     return FALSE;
671   }
672   res = curl_easy_setopt (sink->curl, CURLOPT_SOCKOPTFUNCTION,
673       gst_curl_base_sink_transfer_socket_cb);
674   if (res != CURLE_OK) {
675     sink->error = g_strdup_printf ("failed to set sockopt function: %s",
676         curl_easy_strerror (res));
677     return FALSE;
678   }
679 
680   res = curl_easy_setopt (sink->curl, CURLOPT_READDATA, sink);
681   if (res != CURLE_OK) {
682     sink->error = g_strdup_printf ("failed to set read user data: %s",
683         curl_easy_strerror (res));
684     return FALSE;
685   }
686   res = curl_easy_setopt (sink->curl, CURLOPT_READFUNCTION,
687       klass->transfer_read_cb);
688   if (res != CURLE_OK) {
689     sink->error = g_strdup_printf ("failed to set read function: %s",
690         curl_easy_strerror (res));
691     return FALSE;
692   }
693 
694   res = curl_easy_setopt (sink->curl, CURLOPT_WRITEDATA, sink);
695   if (res != CURLE_OK) {
696     sink->error = g_strdup_printf ("failed to set write user data: %s",
697         curl_easy_strerror (res));
698     return FALSE;
699   }
700   res = curl_easy_setopt (sink->curl, CURLOPT_WRITEFUNCTION,
701       gst_curl_base_sink_transfer_write_cb);
702   if (res != CURLE_OK) {
703     sink->error = g_strdup_printf ("failed to set write function: %s",
704         curl_easy_strerror (res));
705     return FALSE;
706   }
707   /* Time out in case transfer speed in bytes per second stay below
708    * CURLOPT_LOW_SPEED_LIMIT during CURLOPT_LOW_SPEED_TIME */
709   res = curl_easy_setopt (sink->curl, CURLOPT_LOW_SPEED_LIMIT, 1L);
710   if (res != CURLE_OK) {
711     sink->error = g_strdup_printf ("failed to set low speed limit: %s",
712         curl_easy_strerror (res));
713     return FALSE;
714   }
715   res = curl_easy_setopt (sink->curl, CURLOPT_LOW_SPEED_TIME,
716       (long) sink->timeout);
717   if (res != CURLE_OK) {
718     sink->error = g_strdup_printf ("failed to set low speed time: %s",
719         curl_easy_strerror (res));
720     return FALSE;
721   }
722 
723   GST_LOG ("common options set");
724   return TRUE;
725 }
726 
727 static gboolean
gst_curl_base_sink_transfer_set_options_unlocked(GstCurlBaseSink * sink)728 gst_curl_base_sink_transfer_set_options_unlocked (GstCurlBaseSink * sink)
729 {
730   GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
731   CURLcode res;
732 
733   if (!gst_curl_base_sink_transfer_set_common_options_unlocked (sink)) {
734     return FALSE;
735   }
736 
737   /* authentication settings */
738   if (sink->user != NULL && strlen (sink->user)) {
739     res = curl_easy_setopt (sink->curl, CURLOPT_USERNAME, sink->user);
740     if (res != CURLE_OK) {
741       sink->error = g_strdup_printf ("failed to set user name: %s",
742           curl_easy_strerror (res));
743       return FALSE;
744     }
745     res = curl_easy_setopt (sink->curl, CURLOPT_PASSWORD, sink->passwd);
746     if (res != CURLE_OK) {
747       sink->error = g_strdup_printf ("failed to set password: %s",
748           curl_easy_strerror (res));
749       return FALSE;
750     }
751   }
752 
753   if (klass->set_options_unlocked) {
754     return klass->set_options_unlocked (sink);
755   } else {
756     return FALSE;
757   }
758 }
759 
760 static size_t
transfer_data_buffer(void * curl_ptr,TransferBuffer * buf,size_t max_bytes_to_send,guint * last_chunk)761 transfer_data_buffer (void *curl_ptr, TransferBuffer * buf,
762     size_t max_bytes_to_send, guint * last_chunk)
763 {
764   guint buf_len = buf->len;
765   size_t bytes_to_send = MIN (max_bytes_to_send, buf->len);
766 
767   memcpy ((guint8 *) curl_ptr, buf->ptr + buf->offset, bytes_to_send);
768   buf->offset = buf->offset + bytes_to_send;
769   buf->len = buf->len - bytes_to_send;
770 
771   /* the last data chunk */
772   if (bytes_to_send == buf_len) {
773     buf->offset = 0;
774     buf->len = 0;
775     *last_chunk = 1;
776   }
777 
778   GST_LOG ("sent : %" G_GSIZE_FORMAT, bytes_to_send);
779 
780   return bytes_to_send;
781 }
782 
783 static size_t
gst_curl_base_sink_transfer_data_buffer(GstCurlBaseSink * sink,void * curl_ptr,size_t block_size,guint * last_chunk)784 gst_curl_base_sink_transfer_data_buffer (GstCurlBaseSink * sink,
785     void *curl_ptr, size_t block_size, guint * last_chunk)
786 {
787   TransferBuffer *buffer;
788 
789   buffer = sink->transfer_buf;
790   GST_LOG ("write buf len=%" G_GSIZE_FORMAT ", offset=%" G_GSIZE_FORMAT,
791       buffer->len, buffer->offset);
792 
793   if (buffer->len <= 0) {
794     GST_WARNING ("got zero- or negative-length buffer");
795 
796     return 0;
797   }
798 
799   /* more data in buffer(s) */
800   return transfer_data_buffer (curl_ptr, buffer, block_size, last_chunk);
801 }
802 
803 static size_t
gst_curl_base_sink_transfer_read_cb(void * curl_ptr,size_t size,size_t nmemb,void * stream)804 gst_curl_base_sink_transfer_read_cb (void *curl_ptr, size_t size, size_t nmemb,
805     void *stream)
806 {
807   GstCurlBaseSink *sink;
808   GstCurlBaseSinkClass *klass;
809   size_t max_bytes_to_send;
810   size_t bytes_to_send;
811   guint last_chunk = 0;
812 
813   sink = (GstCurlBaseSink *) stream;
814   klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
815 
816   max_bytes_to_send = size * nmemb;
817 
818   /* wait for data to come available, if new file or thread close is set
819    * then zero will be returned to indicate end of current transfer */
820   GST_OBJECT_LOCK (sink);
821   if (gst_curl_base_sink_wait_for_data_unlocked (sink) == FALSE) {
822 
823     if (gst_curl_base_sink_has_buffered_data_unlocked (sink) &&
824         sink->transfer_thread_close) {
825       GST_WARNING_OBJECT (sink,
826           "discarding render data due to thread close flag");
827 
828       GST_OBJECT_UNLOCK (sink);
829       return CURL_READFUNC_ABORT;
830     }
831 
832     if (klass->flush_data_unlocked) {
833       bytes_to_send = klass->flush_data_unlocked (sink, curl_ptr,
834           max_bytes_to_send, sink->new_file, sink->transfer_thread_close);
835 
836       GST_OBJECT_UNLOCK (sink);
837 
838       return bytes_to_send;
839     }
840 
841     GST_OBJECT_UNLOCK (sink);
842     GST_LOG ("returning 0, no more data to send in this file");
843 
844     return 0;
845   }
846 
847   GST_OBJECT_UNLOCK (sink);
848 
849   bytes_to_send = klass->transfer_data_buffer (sink, curl_ptr,
850       max_bytes_to_send, &last_chunk);
851 
852   /* the last data chunk */
853   if (last_chunk) {
854     gst_curl_base_sink_data_sent_notify (sink);
855   }
856 
857   return bytes_to_send;
858 }
859 
860 static size_t
gst_curl_base_sink_transfer_write_cb(void G_GNUC_UNUSED * ptr,size_t size,size_t nmemb,void G_GNUC_UNUSED * stream)861 gst_curl_base_sink_transfer_write_cb (void G_GNUC_UNUSED * ptr, size_t size,
862     size_t nmemb, void G_GNUC_UNUSED * stream)
863 {
864   GstCurlBaseSink *sink;
865   GstCurlBaseSinkClass *klass;
866   size_t realsize = size * nmemb;
867 
868   sink = (GstCurlBaseSink *) stream;
869   klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
870 
871   if (klass->transfer_verify_response_code) {
872     if (!klass->transfer_verify_response_code (sink)) {
873       GST_DEBUG_OBJECT (sink, "response error");
874       GST_OBJECT_LOCK (sink);
875       sink->flow_ret = GST_FLOW_ERROR;
876       GST_OBJECT_UNLOCK (sink);
877     }
878   }
879 
880   GST_DEBUG ("response %s", (gchar *) ptr);
881 
882   return realsize;
883 }
884 
885 CURLcode
gst_curl_base_sink_transfer_check(GstCurlBaseSink * sink)886 gst_curl_base_sink_transfer_check (GstCurlBaseSink * sink)
887 {
888   CURLcode code = CURLE_OK;
889   CURL *easy;
890   CURLMsg *msg;
891   gint msgs_left;
892   gchar *eff_url = NULL;
893 
894   do {
895     easy = NULL;
896     while ((msg = curl_multi_info_read (sink->multi_handle, &msgs_left))) {
897       if (msg->msg == CURLMSG_DONE) {
898         easy = msg->easy_handle;
899         code = msg->data.result;
900         break;
901       }
902     }
903     if (easy) {
904       curl_easy_getinfo (easy, CURLINFO_EFFECTIVE_URL, &eff_url);
905       GST_DEBUG ("transfer done %s (%s-%d)\n", eff_url,
906           curl_easy_strerror (code), code);
907     }
908   } while (easy);
909 
910   return code;
911 }
912 
913 static void
handle_transfer(GstCurlBaseSink * sink)914 handle_transfer (GstCurlBaseSink * sink)
915 {
916   GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
917   gint retval;
918   gint activated_fds;
919   gint running_handles;
920   gint timeout;
921   CURLMcode m_code;
922   CURLcode e_code;
923 
924   GST_OBJECT_LOCK (sink);
925   timeout = sink->timeout;
926   GST_OBJECT_UNLOCK (sink);
927 
928   GST_DEBUG_OBJECT (sink, "handling transfers");
929 
930   /* Receiving CURLM_CALL_MULTI_PERFORM means that libcurl may have more data
931      available to send or receive - call simply curl_multi_perform before
932      poll() on more actions */
933   do {
934     m_code = curl_multi_perform (sink->multi_handle, &running_handles);
935   } while (m_code == CURLM_CALL_MULTI_PERFORM);
936   GST_DEBUG_OBJECT (sink, "running handles: %d", running_handles);
937 
938   while (running_handles && (m_code == CURLM_OK)) {
939     if (klass->transfer_prepare_poll_wait) {
940       klass->transfer_prepare_poll_wait (sink);
941     }
942 
943     activated_fds = gst_poll_wait (sink->fdset, timeout * GST_SECOND);
944     if (G_UNLIKELY (activated_fds == -1)) {
945       if (errno == EAGAIN || errno == EINTR) {
946         GST_DEBUG_OBJECT (sink, "interrupted by signal");
947       } else if (errno == EBUSY) {
948         GST_DEBUG_OBJECT (sink, "poll stopped");
949         retval = GST_FLOW_EOS;
950 
951         GST_OBJECT_LOCK (sink);
952         if (gst_curl_base_sink_has_buffered_data_unlocked (sink))
953           GST_WARNING_OBJECT (sink,
954               "discarding render data due to thread close flag");
955         GST_OBJECT_UNLOCK (sink);
956 
957         goto fail;
958       } else {
959         sink->error = g_strdup_printf ("poll failed: %s", g_strerror (errno));
960         retval = GST_FLOW_ERROR;
961         goto fail;
962       }
963     } else if (G_UNLIKELY (activated_fds == 0)) {
964       sink->error = g_strdup_printf ("poll timed out after %" GST_TIME_FORMAT,
965           GST_TIME_ARGS (timeout * GST_SECOND));
966       retval = GST_FLOW_ERROR;
967       goto fail;
968     }
969 
970     /* readable/writable sockets */
971     do {
972       m_code = curl_multi_perform (sink->multi_handle, &running_handles);
973     } while (m_code == CURLM_CALL_MULTI_PERFORM);
974     GST_DEBUG_OBJECT (sink, "running handles: %d", running_handles);
975   }
976 
977   if (m_code != CURLM_OK) {
978     sink->error = g_strdup_printf ("failed to write data: %s",
979         curl_multi_strerror (m_code));
980     retval = GST_FLOW_ERROR;
981     goto fail;
982   }
983 
984   /* problems still might have occurred on individual transfers even when
985    * curl_multi_perform returns CURLM_OK */
986   if ((e_code = gst_curl_base_sink_transfer_check (sink)) != CURLE_OK) {
987     sink->error = g_strdup_printf ("failed to transfer data: %s",
988         curl_easy_strerror (e_code));
989     retval = GST_FLOW_ERROR;
990     goto fail;
991   }
992 
993   gst_curl_base_sink_got_response_notify (sink);
994 
995   GST_OBJECT_LOCK (sink);
996   if (sink->socket_type == CURLSOCKTYPE_ACCEPT) {
997     /* FIXME: remove this again once we can depend on libcurl > 7.44.0,
998      * see https://github.com/bagder/curl/issues/405.
999      */
1000     if (G_UNLIKELY (sink->fd.fd < 0)) {
1001       sink->error = g_strdup_printf ("unknown error");
1002       retval = GST_FLOW_ERROR;
1003       GST_OBJECT_UNLOCK (sink);
1004       goto fail;
1005     }
1006     if (!gst_poll_remove_fd (sink->fdset, &sink->fd)) {
1007       sink->error = g_strdup_printf ("failed to remove fd");
1008       retval = GST_FLOW_ERROR;
1009       GST_OBJECT_UNLOCK (sink);
1010       goto fail;
1011     }
1012     sink->fd.fd = -1;
1013   }
1014   GST_OBJECT_UNLOCK (sink);
1015 
1016   return;
1017 
1018 fail:
1019   GST_OBJECT_LOCK (sink);
1020   if (sink->flow_ret == GST_FLOW_OK) {
1021     sink->flow_ret = retval;
1022   }
1023   GST_OBJECT_UNLOCK (sink);
1024   return;
1025 }
1026 
1027 #ifndef GST_DISABLE_GST_DEBUG
1028 static int
gst_curl_base_sink_debug_cb(CURL * handle,curl_infotype type,char * data,size_t size,void * clientp)1029 gst_curl_base_sink_debug_cb (CURL * handle, curl_infotype type, char *data,
1030     size_t size, void *clientp)
1031 {
1032   GstCurlBaseSink *sink = (GstCurlBaseSink *) clientp;
1033   gchar *msg = NULL;
1034 
1035   switch (type) {
1036     case CURLINFO_TEXT:
1037     case CURLINFO_HEADER_IN:
1038     case CURLINFO_HEADER_OUT:
1039       msg = g_memdup (data, size);
1040       if (size > 0) {
1041         msg[size - 1] = '\0';
1042         g_strchomp (msg);
1043       }
1044       break;
1045     default:
1046       break;
1047   }
1048 
1049   switch (type) {
1050     case CURLINFO_TEXT:
1051       GST_DEBUG_OBJECT (sink, "%s", msg);
1052       break;
1053     case CURLINFO_HEADER_IN:
1054       GST_DEBUG_OBJECT (sink, "incoming header: %s", msg);
1055       break;
1056     case CURLINFO_HEADER_OUT:
1057       GST_DEBUG_OBJECT (sink, "outgoing header: %s", msg);
1058       break;
1059     case CURLINFO_DATA_IN:
1060       GST_MEMDUMP_OBJECT (sink, "incoming data", (guint8 *) data, size);
1061       break;
1062     case CURLINFO_DATA_OUT:
1063       GST_MEMDUMP_OBJECT (sink, "outgoing data", (guint8 *) data, size);
1064       break;
1065     case CURLINFO_SSL_DATA_IN:
1066       GST_MEMDUMP_OBJECT (sink, "incoming ssl data", (guint8 *) data, size);
1067       break;
1068     case CURLINFO_SSL_DATA_OUT:
1069       GST_MEMDUMP_OBJECT (sink, "outgoing ssl data", (guint8 *) data, size);
1070       break;
1071     default:
1072       GST_DEBUG_OBJECT (sink, "unknown debug info type %d", type);
1073       GST_MEMDUMP_OBJECT (sink, "unknown data", (guint8 *) data, size);
1074       break;
1075   }
1076   g_free (msg);
1077   return 0;
1078 }
1079 #endif
1080 
1081 /* This function gets called by libcurl after the socket() call but before
1082  * the connect() call. */
1083 static int
gst_curl_base_sink_transfer_socket_cb(void * clientp,curl_socket_t curlfd,curlsocktype socket_type)1084 gst_curl_base_sink_transfer_socket_cb (void *clientp, curl_socket_t curlfd,
1085     curlsocktype socket_type)
1086 {
1087   GstCurlBaseSink *sink;
1088   gboolean ret = TRUE;
1089 
1090   sink = (GstCurlBaseSink *) clientp;
1091 
1092   g_assert (sink);
1093 
1094   if (curlfd < 0) {
1095     /* signal an unrecoverable error to the library which will close the socket
1096        and return CURLE_COULDNT_CONNECT
1097      */
1098     GST_DEBUG_OBJECT (sink, "no curlfd");
1099     return 1;
1100   }
1101 
1102   GST_OBJECT_LOCK (sink);
1103   sink->socket_type = socket_type;
1104 
1105   if (sink->fd.fd != curlfd) {
1106     if (sink->fd.fd > 0 && sink->socket_type != CURLSOCKTYPE_ACCEPT) {
1107       ret &= gst_poll_remove_fd (sink->fdset, &sink->fd);
1108     }
1109     sink->fd.fd = curlfd;
1110     ret &= gst_poll_add_fd (sink->fdset, &sink->fd);
1111     ret &= gst_poll_fd_ctl_write (sink->fdset, &sink->fd, TRUE);
1112     ret &= gst_poll_fd_ctl_read (sink->fdset, &sink->fd, TRUE);
1113   }
1114   GST_DEBUG_OBJECT (sink, "fd: %d", sink->fd.fd);
1115   gst_curl_base_sink_setup_dscp_unlocked (sink);
1116   GST_OBJECT_UNLOCK (sink);
1117 
1118   /* success */
1119   return ret ? 0 : 1;
1120 }
1121 
1122 static gboolean
gst_curl_base_sink_transfer_start_unlocked(GstCurlBaseSink * sink)1123 gst_curl_base_sink_transfer_start_unlocked (GstCurlBaseSink * sink)
1124 {
1125   GError *error = NULL;
1126   gboolean ret = TRUE;
1127 
1128   GST_LOG ("creating transfer thread");
1129   sink->transfer_thread_close = FALSE;
1130   sink->new_file = TRUE;
1131   sink->transfer_thread = g_thread_try_new ("curl-transfer", (GThreadFunc)
1132       gst_curl_base_sink_transfer_thread_func, sink, &error);
1133 
1134   if (sink->transfer_thread == NULL || error != NULL) {
1135     ret = FALSE;
1136     if (error) {
1137       GST_ERROR_OBJECT (sink, "could not create thread %s", error->message);
1138       g_error_free (error);
1139     } else {
1140       GST_ERROR_OBJECT (sink, "could not create thread for unknown reason");
1141     }
1142   }
1143 
1144   return ret;
1145 }
1146 
1147 static gpointer
gst_curl_base_sink_transfer_thread_func(gpointer data)1148 gst_curl_base_sink_transfer_thread_func (gpointer data)
1149 {
1150   GstCurlBaseSink *sink = (GstCurlBaseSink *) data;
1151   GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
1152   GstFlowReturn ret;
1153   gboolean data_available;
1154 
1155   GST_LOG ("transfer thread started");
1156   GST_OBJECT_LOCK (sink);
1157   if (!gst_curl_base_sink_transfer_setup_unlocked (sink)) {
1158     /* no need to set sink->error, as it is set by the called function */
1159     sink->flow_ret = GST_FLOW_ERROR;
1160     goto done;
1161   }
1162 
1163   while (!sink->transfer_thread_close && sink->flow_ret == GST_FLOW_OK) {
1164     /* we are working on a new file, clearing flag and setting a new file
1165      * name */
1166     sink->new_file = FALSE;
1167 
1168     /* wait for data to arrive for this new file, if we get a new file name
1169      * again before getting data we will simply skip transferring anything
1170      * for this file and go directly to the new file */
1171     data_available = gst_curl_base_sink_wait_for_data_unlocked (sink);
1172     if (data_available) {
1173       if (G_UNLIKELY (!klass->set_protocol_dynamic_options_unlocked (sink))) {
1174         sink->error = g_strdup ("unexpected state");
1175         sink->flow_ret = GST_FLOW_ERROR;
1176         goto done;
1177       }
1178     }
1179 
1180     /* stay unlocked while handling the actual transfer */
1181     GST_OBJECT_UNLOCK (sink);
1182 
1183     if (data_available) {
1184       GST_LOG ("have data");
1185       if (!gst_curl_base_sink_is_live (sink)) {
1186         /* prepare transfer if needed */
1187         if (klass->prepare_transfer) {
1188           GST_OBJECT_LOCK (sink);
1189           if (!klass->prepare_transfer (sink)) {
1190             sink->flow_ret = GST_FLOW_ERROR;
1191             goto done;
1192           }
1193           GST_OBJECT_UNLOCK (sink);
1194         }
1195         GST_LOG ("adding handle");
1196         curl_multi_add_handle (sink->multi_handle, sink->curl);
1197       }
1198 
1199       /* Start driving the transfer. */
1200       klass->handle_transfer (sink);
1201 
1202       /* easy handle will be possibly re-used for next transfer, thus it needs
1203        * to be removed from the multi stack and re-added again */
1204       if (!gst_curl_base_sink_is_live (sink)) {
1205         GST_LOG ("removing handle");
1206         curl_multi_remove_handle (sink->multi_handle, sink->curl);
1207       }
1208     } else {
1209       GST_LOG ("have no data yet");
1210     }
1211 
1212     /* lock again before looping to check the thread closed flag */
1213     GST_OBJECT_LOCK (sink);
1214   }
1215 
1216   if (sink->is_live) {
1217     GST_LOG ("removing handle");
1218     curl_multi_remove_handle (sink->multi_handle, sink->curl);
1219   }
1220 
1221 done:
1222   gst_curl_base_sink_transfer_cleanup (sink);
1223 
1224   /* extract the error code so the lock does not have to be
1225    * taken when calling the functions below that take the lock
1226    * on their own */
1227   ret = sink->flow_ret;
1228   GST_OBJECT_UNLOCK (sink);
1229 
1230   /* if there is a flow error, always notify the render function so it
1231    * can return the flow error up along the pipeline. as an error has
1232    * occurred there is no response to receive, so notify the event function
1233    * so it doesn't block indefinitely waiting for a response. */
1234   if (ret != GST_FLOW_OK) {
1235     gst_curl_base_sink_data_sent_notify (sink);
1236     gst_curl_base_sink_got_response_notify (sink);
1237   }
1238 
1239   GST_DEBUG ("exit thread func - transfer thread close flag: %d",
1240       sink->transfer_thread_close);
1241 
1242   return NULL;
1243 }
1244 
1245 static gboolean
gst_curl_base_sink_transfer_setup_unlocked(GstCurlBaseSink * sink)1246 gst_curl_base_sink_transfer_setup_unlocked (GstCurlBaseSink * sink)
1247 {
1248   g_assert (sink);
1249 
1250   if (sink->curl == NULL) {
1251     /* curl_easy_init automatically calls curl_global_init(3) */
1252     if ((sink->curl = curl_easy_init ()) == NULL) {
1253       sink->error = g_strdup ("failed to init curl easy handle");
1254       return FALSE;
1255     }
1256   }
1257 
1258   if (!gst_curl_base_sink_transfer_set_options_unlocked (sink)) {
1259     if (!sink->error) {
1260       sink->error = g_strdup ("failed to setup curl easy handle");
1261     }
1262     return FALSE;
1263   }
1264 
1265   /* init a multi stack (non-blocking interface to libcurl) */
1266   if (sink->multi_handle == NULL) {
1267     if ((sink->multi_handle = curl_multi_init ()) == NULL) {
1268       sink->error = g_strdup ("failed to init curl multi handle");
1269       return FALSE;
1270     }
1271   }
1272 
1273   GST_LOG ("transfer setup done");
1274   return TRUE;
1275 }
1276 
1277 static void
gst_curl_base_sink_transfer_cleanup(GstCurlBaseSink * sink)1278 gst_curl_base_sink_transfer_cleanup (GstCurlBaseSink * sink)
1279 {
1280   if (sink->curl != NULL) {
1281     if (sink->multi_handle != NULL) {
1282       curl_multi_remove_handle (sink->multi_handle, sink->curl);
1283     }
1284     curl_easy_cleanup (sink->curl);
1285     sink->curl = NULL;
1286   }
1287 
1288   if (sink->multi_handle != NULL) {
1289     curl_multi_cleanup (sink->multi_handle);
1290     sink->multi_handle = NULL;
1291   }
1292 }
1293 
1294 static gboolean
gst_curl_base_sink_wait_for_data_unlocked(GstCurlBaseSink * sink)1295 gst_curl_base_sink_wait_for_data_unlocked (GstCurlBaseSink * sink)
1296 {
1297   gboolean data_available = FALSE;
1298 
1299   GST_LOG ("waiting for data");
1300   while (!sink->transfer_cond->data_available &&
1301       !sink->transfer_thread_close && !sink->new_file) {
1302     g_cond_wait (&sink->transfer_cond->cond, GST_OBJECT_GET_LOCK (sink));
1303   }
1304 
1305   if (sink->transfer_thread_close) {
1306     GST_LOG ("wait for data aborted due to thread close");
1307   } else if (sink->new_file) {
1308     GST_LOG ("wait for data aborted due to new file name");
1309   } else {
1310     GST_LOG ("wait for data completed");
1311     data_available = TRUE;
1312   }
1313 
1314   return data_available;
1315 }
1316 
1317 static void
gst_curl_base_sink_new_file_notify_unlocked(GstCurlBaseSink * sink)1318 gst_curl_base_sink_new_file_notify_unlocked (GstCurlBaseSink * sink)
1319 {
1320   GST_LOG ("new file name");
1321   sink->new_file = TRUE;
1322   g_cond_signal (&sink->transfer_cond->cond);
1323 }
1324 
1325 static void
gst_curl_base_sink_wait_for_transfer_thread_to_send_unlocked(GstCurlBaseSink * sink)1326     gst_curl_base_sink_wait_for_transfer_thread_to_send_unlocked
1327     (GstCurlBaseSink * sink)
1328 {
1329   GST_LOG ("waiting for buffer send to complete");
1330 
1331   /* this function should not check if the transfer thread is set to be closed
1332    * since that flag only can be set by the EOS event (by the pipeline thread).
1333    * This can therefore never happen while this function is running since this
1334    * function also is called by the pipeline thread (in the render function) */
1335   while (!sink->transfer_cond->data_sent) {
1336     g_cond_wait (&sink->transfer_cond->cond, GST_OBJECT_GET_LOCK (sink));
1337   }
1338   GST_LOG ("buffer send completed");
1339 }
1340 
1341 static void
gst_curl_base_sink_data_sent_notify(GstCurlBaseSink * sink)1342 gst_curl_base_sink_data_sent_notify (GstCurlBaseSink * sink)
1343 {
1344   GST_LOG ("transfer completed");
1345   GST_OBJECT_LOCK (sink);
1346   sink->transfer_cond->data_available = FALSE;
1347   sink->transfer_cond->data_sent = TRUE;
1348   g_cond_signal (&sink->transfer_cond->cond);
1349   GST_OBJECT_UNLOCK (sink);
1350 }
1351 
1352 static void
gst_curl_base_sink_wait_for_response(GstCurlBaseSink * sink)1353 gst_curl_base_sink_wait_for_response (GstCurlBaseSink * sink)
1354 {
1355   GST_LOG ("waiting for remote to send response code");
1356 
1357   GST_OBJECT_LOCK (sink);
1358   while (sink->transfer_cond->wait_for_response) {
1359     g_cond_wait (&sink->transfer_cond->cond, GST_OBJECT_GET_LOCK (sink));
1360   }
1361   GST_OBJECT_UNLOCK (sink);
1362 
1363   GST_LOG ("response code received");
1364 }
1365 
1366 static void
gst_curl_base_sink_got_response_notify(GstCurlBaseSink * sink)1367 gst_curl_base_sink_got_response_notify (GstCurlBaseSink * sink)
1368 {
1369   GST_LOG ("got response code");
1370 
1371   GST_OBJECT_LOCK (sink);
1372   sink->transfer_cond->wait_for_response = FALSE;
1373   g_cond_signal (&sink->transfer_cond->cond);
1374   GST_OBJECT_UNLOCK (sink);
1375 }
1376 
1377 static gint
gst_curl_base_sink_setup_dscp_unlocked(GstCurlBaseSink * sink)1378 gst_curl_base_sink_setup_dscp_unlocked (GstCurlBaseSink * sink)
1379 {
1380   gint tos;
1381   gint af;
1382   gint ret = -1;
1383   union
1384   {
1385     struct sockaddr sa;
1386     struct sockaddr_in6 sa_in6;
1387     struct sockaddr_storage sa_stor;
1388   } sa;
1389   socklen_t slen = sizeof (sa);
1390 
1391   if (getsockname (sink->fd.fd, &sa.sa, &slen) < 0) {
1392     GST_DEBUG_OBJECT (sink, "could not get sockname: %s", g_strerror (errno));
1393     return ret;
1394   }
1395   af = sa.sa.sa_family;
1396 
1397   /* if this is an IPv4-mapped address then do IPv4 QoS */
1398   if (af == AF_INET6) {
1399     GST_DEBUG_OBJECT (sink, "check IP6 socket");
1400     if (IN6_IS_ADDR_V4MAPPED (&(sa.sa_in6.sin6_addr))) {
1401       GST_DEBUG_OBJECT (sink, "mapped to IPV4");
1402       af = AF_INET;
1403     }
1404   }
1405   /* extract and shift 6 bits of the DSCP */
1406   tos = (sink->qos_dscp & 0x3f) << 2;
1407 
1408   switch (af) {
1409     case AF_INET:
1410       ret = setsockopt (sink->fd.fd, IPPROTO_IP, IP_TOS, (void *) &tos,
1411           sizeof (tos));
1412       break;
1413     case AF_INET6:
1414 #ifdef IPV6_TCLASS
1415       ret = setsockopt (sink->fd.fd, IPPROTO_IPV6, IPV6_TCLASS, (void *) &tos,
1416           sizeof (tos));
1417       break;
1418 #endif
1419     default:
1420       GST_ERROR_OBJECT (sink, "unsupported AF");
1421       break;
1422   }
1423   if (ret) {
1424     GST_DEBUG_OBJECT (sink, "could not set DSCP: %s", g_strerror (errno));
1425   }
1426 
1427   return ret;
1428 }
1429