1 /* GStreamer
2 * Copyright (C) 2011 Axis Communications <dev-gstreamer@axis.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
13 *
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
18 */
19
20 /**
21 * SECTION:element-curlsink
22 * @title: curlsink
23 * @short_description: sink that uploads data to a server using libcurl
24 * @see_also:
25 *
26 * This is a network sink that uses libcurl as a client to upload data to
27 * a server (e.g. a HTTP/FTP server).
28 *
29 * ## Example launch line (upload a JPEG file to an HTTP server)
30 * |[
31 * gst-launch-1.0 filesrc location=image.jpg ! jpegparse ! curlsink \
32 * file-name=image.jpg \
33 * location=http://192.168.0.1:8080/cgi-bin/patupload.cgi/ \
34 * user=test passwd=test \
35 * content-type=image/jpeg \
36 * use-content-length=false
37 * ]|
38 *
39 */
40
41 #ifdef HAVE_CONFIG_H
42 #include "config.h"
43 #endif
44
45 #include <curl/curl.h>
46 #include <string.h>
47 #include <stdio.h>
48
49 #if HAVE_SYS_SOCKET_H
50 #include <sys/socket.h>
51 #endif
52 #include <sys/types.h>
53 #if HAVE_NETINET_IN_H
54 #include <netinet/in.h>
55 #endif
56 #include <unistd.h>
57 #if HAVE_NETINET_IP_H
58 #include <netinet/ip.h>
59 #endif
60 #if HAVE_NETINET_TCP_H
61 #include <netinet/tcp.h>
62 #endif
63 #include <sys/stat.h>
64 #include <fcntl.h>
65
66 #include "gstcurlbasesink.h"
67
68 /* Default values */
69 #define GST_CAT_DEFAULT gst_curl_base_sink_debug
70 #define DEFAULT_URL "localhost:5555"
71 #define DEFAULT_TIMEOUT 30
72 #define DEFAULT_QOS_DSCP 0
73
74 #define DSCP_MIN 0
75 #define DSCP_MAX 63
76
77
78 /* Plugin specific settings */
79 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
80 GST_PAD_SINK,
81 GST_PAD_ALWAYS,
82 GST_STATIC_CAPS_ANY);
83
84 GST_DEBUG_CATEGORY_STATIC (gst_curl_base_sink_debug);
85
86 enum
87 {
88 PROP_0,
89 PROP_LOCATION,
90 PROP_USER_NAME,
91 PROP_USER_PASSWD,
92 PROP_FILE_NAME,
93 PROP_TIMEOUT,
94 PROP_QOS_DSCP
95 };
96
97 /* Object class function declarations */
98 static void gst_curl_base_sink_finalize (GObject * gobject);
99 static void gst_curl_base_sink_set_property (GObject * object, guint prop_id,
100 const GValue * value, GParamSpec * pspec);
101 static void gst_curl_base_sink_get_property (GObject * object, guint prop_id,
102 GValue * value, GParamSpec * pspec);
103
104 /* BaseSink class function declarations */
105 static GstFlowReturn gst_curl_base_sink_render (GstBaseSink * bsink,
106 GstBuffer * buf);
107 static gboolean gst_curl_base_sink_event (GstBaseSink * bsink,
108 GstEvent * event);
109 static gboolean gst_curl_base_sink_start (GstBaseSink * bsink);
110 static gboolean gst_curl_base_sink_stop (GstBaseSink * bsink);
111 static gboolean gst_curl_base_sink_unlock (GstBaseSink * bsink);
112 static gboolean gst_curl_base_sink_unlock_stop (GstBaseSink * bsink);
113
114 /* private functions */
115
116 static gboolean gst_curl_base_sink_transfer_setup_unlocked
117 (GstCurlBaseSink * sink);
118 static gboolean gst_curl_base_sink_transfer_start_unlocked
119 (GstCurlBaseSink * sink);
120 static void gst_curl_base_sink_transfer_cleanup (GstCurlBaseSink * sink);
121 static size_t gst_curl_base_sink_transfer_read_cb (void *ptr, size_t size,
122 size_t nmemb, void *stream);
123 static size_t gst_curl_base_sink_transfer_write_cb (void *ptr, size_t size,
124 size_t nmemb, void *stream);
125 static size_t gst_curl_base_sink_transfer_data_buffer (GstCurlBaseSink * sink,
126 void *curl_ptr, size_t block_size, guint * last_chunk);
127 #ifndef GST_DISABLE_GST_DEBUG
128 static int gst_curl_base_sink_debug_cb (CURL * handle, curl_infotype type,
129 char *data, size_t size, void *clientp);
130 #endif
131 static int gst_curl_base_sink_transfer_socket_cb (void *clientp,
132 curl_socket_t curlfd, curlsocktype purpose);
133 static gpointer gst_curl_base_sink_transfer_thread_func (gpointer data);
134 static gint gst_curl_base_sink_setup_dscp_unlocked (GstCurlBaseSink * sink);
135 static CURLcode gst_curl_base_sink_transfer_check (GstCurlBaseSink * sink);
136
137 static gboolean gst_curl_base_sink_wait_for_data_unlocked
138 (GstCurlBaseSink * sink);
139 static void gst_curl_base_sink_new_file_notify_unlocked
140 (GstCurlBaseSink * sink);
141 static void gst_curl_base_sink_wait_for_transfer_thread_to_send_unlocked
142 (GstCurlBaseSink * sink);
143 static void gst_curl_base_sink_data_sent_notify (GstCurlBaseSink * sink);
144 static void gst_curl_base_sink_wait_for_response (GstCurlBaseSink * sink);
145 static void gst_curl_base_sink_got_response_notify (GstCurlBaseSink * sink);
146
147 static void handle_transfer (GstCurlBaseSink * sink);
148 static size_t transfer_data_buffer (void *curl_ptr, TransferBuffer * buf,
149 size_t max_bytes_to_send, guint * last_chunk);
150
151 #define parent_class gst_curl_base_sink_parent_class
152 G_DEFINE_TYPE (GstCurlBaseSink, gst_curl_base_sink, GST_TYPE_BASE_SINK);
153
154 static gboolean
gst_curl_base_sink_default_has_buffered_data_unlocked(GstCurlBaseSink * sink)155 gst_curl_base_sink_default_has_buffered_data_unlocked (GstCurlBaseSink * sink)
156 {
157 return sink->transfer_buf->len > 0;
158 }
159
160 static gboolean
gst_curl_base_sink_has_buffered_data_unlocked(GstCurlBaseSink * sink)161 gst_curl_base_sink_has_buffered_data_unlocked (GstCurlBaseSink * sink)
162 {
163 GstCurlBaseSinkClass *klass;
164 gboolean res = FALSE;
165
166 klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
167
168 if (klass->has_buffered_data_unlocked)
169 res = klass->has_buffered_data_unlocked (sink);
170
171 return res;
172 }
173
174 static void
gst_curl_base_sink_class_init(GstCurlBaseSinkClass * klass)175 gst_curl_base_sink_class_init (GstCurlBaseSinkClass * klass)
176 {
177 GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
178 GstBaseSinkClass *gstbasesink_class = (GstBaseSinkClass *) klass;
179 GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
180
181 GST_DEBUG_CATEGORY_INIT (gst_curl_base_sink_debug, "curlbasesink", 0,
182 "curl base sink element");
183 GST_DEBUG_OBJECT (klass, "class_init");
184
185 gst_element_class_set_static_metadata (element_class,
186 "Curl base sink",
187 "Sink/Network",
188 "Upload data over the network to a server using libcurl",
189 "Patricia Muscalu <patricia@axis.com>");
190
191 gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_curl_base_sink_event);
192 gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_curl_base_sink_render);
193 gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_curl_base_sink_start);
194 gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_curl_base_sink_stop);
195 gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_curl_base_sink_unlock);
196 gstbasesink_class->unlock_stop =
197 GST_DEBUG_FUNCPTR (gst_curl_base_sink_unlock_stop);
198 gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_curl_base_sink_finalize);
199
200 gobject_class->set_property = gst_curl_base_sink_set_property;
201 gobject_class->get_property = gst_curl_base_sink_get_property;
202
203 klass->handle_transfer = handle_transfer;
204 klass->transfer_read_cb = gst_curl_base_sink_transfer_read_cb;
205 klass->transfer_data_buffer = gst_curl_base_sink_transfer_data_buffer;
206 klass->has_buffered_data_unlocked =
207 gst_curl_base_sink_default_has_buffered_data_unlocked;
208
209 /* FIXME: check against souphttpsrc and use same names for same properties */
210 g_object_class_install_property (gobject_class, PROP_LOCATION,
211 g_param_spec_string ("location", "Location",
212 "URI location to write to", NULL,
213 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
214 g_object_class_install_property (gobject_class, PROP_USER_NAME,
215 g_param_spec_string ("user", "User name",
216 "User name to use for server authentication", NULL,
217 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
218 g_object_class_install_property (gobject_class, PROP_USER_PASSWD,
219 g_param_spec_string ("passwd", "User password",
220 "User password to use for server authentication", NULL,
221 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
222 g_object_class_install_property (gobject_class, PROP_FILE_NAME,
223 g_param_spec_string ("file-name", "Base file name",
224 "The base file name for the uploaded images", NULL,
225 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
226 g_object_class_install_property (gobject_class, PROP_TIMEOUT,
227 g_param_spec_int ("timeout", "Timeout",
228 "Number of seconds waiting to write before timeout",
229 0, G_MAXINT, DEFAULT_TIMEOUT,
230 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
231 g_object_class_install_property (gobject_class, PROP_QOS_DSCP,
232 g_param_spec_int ("qos-dscp",
233 "QoS diff srv code point",
234 "Quality of Service, differentiated services code point (0 default)",
235 DSCP_MIN, DSCP_MAX, DEFAULT_QOS_DSCP,
236 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
237
238 gst_element_class_add_static_pad_template (element_class, &sinktemplate);
239 }
240
241 static void
gst_curl_base_sink_init(GstCurlBaseSink * sink)242 gst_curl_base_sink_init (GstCurlBaseSink * sink)
243 {
244 sink->transfer_buf = g_malloc (sizeof (TransferBuffer));
245 sink->transfer_cond = g_malloc (sizeof (TransferCondition));
246 g_cond_init (&sink->transfer_cond->cond);
247 sink->transfer_cond->data_sent = FALSE;
248 sink->transfer_cond->data_available = FALSE;
249 sink->transfer_cond->wait_for_response = FALSE;
250 sink->timeout = DEFAULT_TIMEOUT;
251 sink->qos_dscp = DEFAULT_QOS_DSCP;
252 sink->url = g_strdup (DEFAULT_URL);
253 sink->transfer_thread_close = FALSE;
254 sink->new_file = TRUE;
255 sink->error = NULL;
256 sink->flow_ret = GST_FLOW_OK;
257 sink->is_live = FALSE;
258 }
259
260 static void
gst_curl_base_sink_finalize(GObject * gobject)261 gst_curl_base_sink_finalize (GObject * gobject)
262 {
263 GstCurlBaseSink *this = GST_CURL_BASE_SINK (gobject);
264
265 GST_DEBUG ("finalizing curlsink");
266 if (this->transfer_thread != NULL) {
267 g_thread_join (this->transfer_thread);
268 }
269
270 g_cond_clear (&this->transfer_cond->cond);
271 g_free (this->transfer_cond);
272 g_free (this->transfer_buf);
273
274 g_free (this->url);
275 g_free (this->user);
276 g_free (this->passwd);
277 g_free (this->file_name);
278 if (this->fdset != NULL) {
279 gst_poll_free (this->fdset);
280 this->fdset = NULL;
281 }
282 G_OBJECT_CLASS (parent_class)->finalize (gobject);
283 }
284
285 void
gst_curl_base_sink_transfer_thread_notify_unlocked(GstCurlBaseSink * sink)286 gst_curl_base_sink_transfer_thread_notify_unlocked (GstCurlBaseSink * sink)
287 {
288 GST_LOG ("more data to send");
289
290 sink->transfer_cond->data_available = TRUE;
291 sink->transfer_cond->data_sent = FALSE;
292 sink->transfer_cond->wait_for_response = TRUE;
293 g_cond_signal (&sink->transfer_cond->cond);
294 }
295
296 void
gst_curl_base_sink_transfer_thread_close(GstCurlBaseSink * sink)297 gst_curl_base_sink_transfer_thread_close (GstCurlBaseSink * sink)
298 {
299 GST_OBJECT_LOCK (sink);
300 GST_LOG_OBJECT (sink, "setting transfer thread close flag");
301 sink->transfer_thread_close = TRUE;
302 g_cond_signal (&sink->transfer_cond->cond);
303 GST_OBJECT_UNLOCK (sink);
304
305 if (sink->transfer_thread != NULL) {
306 GST_LOG_OBJECT (sink, "waiting for transfer thread to finish");
307 g_thread_join (sink->transfer_thread);
308 sink->transfer_thread = NULL;
309 }
310 }
311
312 void
gst_curl_base_sink_set_live(GstCurlBaseSink * sink,gboolean live)313 gst_curl_base_sink_set_live (GstCurlBaseSink * sink, gboolean live)
314 {
315 g_return_if_fail (GST_IS_CURL_BASE_SINK (sink));
316
317 GST_OBJECT_LOCK (sink);
318 sink->is_live = live;
319 GST_OBJECT_UNLOCK (sink);
320 }
321
322 gboolean
gst_curl_base_sink_is_live(GstCurlBaseSink * sink)323 gst_curl_base_sink_is_live (GstCurlBaseSink * sink)
324 {
325 gboolean result;
326
327 g_return_val_if_fail (GST_IS_CURL_BASE_SINK (sink), FALSE);
328
329 GST_OBJECT_LOCK (sink);
330 result = sink->is_live;
331 GST_OBJECT_UNLOCK (sink);
332
333 return result;
334 }
335
336 static GstFlowReturn
gst_curl_base_sink_render(GstBaseSink * bsink,GstBuffer * buf)337 gst_curl_base_sink_render (GstBaseSink * bsink, GstBuffer * buf)
338 {
339 GstCurlBaseSink *sink;
340 GstMapInfo map;
341 guint8 *data;
342 size_t size;
343 GstFlowReturn ret;
344 gchar *error;
345
346 GST_LOG ("enter render");
347
348 sink = GST_CURL_BASE_SINK (bsink);
349
350 gst_buffer_map (buf, &map, GST_MAP_READ);
351 data = map.data;
352 size = map.size;
353
354 if (size == 0) {
355 gst_buffer_unmap (buf, &map);
356 return GST_FLOW_OK;
357 }
358
359 GST_OBJECT_LOCK (sink);
360
361 /* check if the transfer thread has encountered problems while the
362 * pipeline thread was working elsewhere */
363 if (sink->flow_ret != GST_FLOW_OK) {
364 goto done;
365 }
366
367 g_assert (sink->transfer_cond->data_available == FALSE);
368
369 /* if there is no transfer thread created, lets create one */
370 if (sink->transfer_thread == NULL) {
371 if (!gst_curl_base_sink_transfer_start_unlocked (sink)) {
372 sink->flow_ret = GST_FLOW_ERROR;
373 goto done;
374 }
375 }
376
377 /* make data available for the transfer thread and notify */
378 sink->transfer_buf->ptr = data;
379 sink->transfer_buf->len = size;
380 sink->transfer_buf->offset = 0;
381 gst_curl_base_sink_transfer_thread_notify_unlocked (sink);
382
383 /* wait for the transfer thread to send the data. This will be notified
384 * either when transfer is completed by the curl read callback or by
385 * the thread function if an error has occurred. */
386 gst_curl_base_sink_wait_for_transfer_thread_to_send_unlocked (sink);
387
388 done:
389 gst_buffer_unmap (buf, &map);
390
391 /* Hand over error from transfer thread to streaming thread */
392 error = sink->error;
393 sink->error = NULL;
394 ret = sink->flow_ret;
395 GST_OBJECT_UNLOCK (sink);
396
397 if (error != NULL) {
398 GST_ERROR_OBJECT (sink, "%s", error);
399 GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, ("%s", error), (NULL));
400 g_free (error);
401 }
402
403 GST_LOG ("exit render");
404
405 return ret;
406 }
407
408 static gboolean
gst_curl_base_sink_event(GstBaseSink * bsink,GstEvent * event)409 gst_curl_base_sink_event (GstBaseSink * bsink, GstEvent * event)
410 {
411 GstCurlBaseSink *sink = GST_CURL_BASE_SINK (bsink);
412 GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
413
414 switch (event->type) {
415 case GST_EVENT_EOS:
416 GST_DEBUG_OBJECT (sink, "received EOS");
417 gst_curl_base_sink_transfer_thread_close (sink);
418 gst_curl_base_sink_wait_for_response (sink);
419 break;
420 case GST_EVENT_CAPS:
421 if (klass->set_mime_type) {
422 GstCaps *caps;
423 gst_event_parse_caps (event, &caps);
424 klass->set_mime_type (sink, caps);
425 }
426 break;
427 default:
428 break;
429 }
430
431 return GST_BASE_SINK_CLASS (parent_class)->event (bsink, event);
432 }
433
434 static gboolean
gst_curl_base_sink_start(GstBaseSink * bsink)435 gst_curl_base_sink_start (GstBaseSink * bsink)
436 {
437 GstCurlBaseSink *sink;
438
439 sink = GST_CURL_BASE_SINK (bsink);
440
441 /* reset flags */
442 sink->transfer_cond->data_sent = FALSE;
443 sink->transfer_cond->data_available = FALSE;
444 sink->transfer_cond->wait_for_response = FALSE;
445 sink->transfer_thread_close = FALSE;
446 sink->new_file = TRUE;
447 sink->flow_ret = GST_FLOW_OK;
448
449 if ((sink->fdset = gst_poll_new (TRUE)) == NULL) {
450 GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_READ_WRITE,
451 ("gst_poll_new failed: %s", g_strerror (errno)), (NULL));
452 return FALSE;
453 }
454
455 gst_poll_fd_init (&sink->fd);
456
457 return TRUE;
458 }
459
460 static gboolean
gst_curl_base_sink_stop(GstBaseSink * bsink)461 gst_curl_base_sink_stop (GstBaseSink * bsink)
462 {
463 GstCurlBaseSink *sink = GST_CURL_BASE_SINK (bsink);
464
465 gst_curl_base_sink_transfer_thread_close (sink);
466 if (sink->fdset != NULL) {
467 gst_poll_free (sink->fdset);
468 sink->fdset = NULL;
469 }
470
471 return TRUE;
472 }
473
474 static gboolean
gst_curl_base_sink_unlock(GstBaseSink * bsink)475 gst_curl_base_sink_unlock (GstBaseSink * bsink)
476 {
477 GstCurlBaseSink *sink;
478
479 sink = GST_CURL_BASE_SINK (bsink);
480
481 GST_LOG_OBJECT (sink, "Flushing");
482 gst_poll_set_flushing (sink->fdset, TRUE);
483
484 return TRUE;
485 }
486
487 static gboolean
gst_curl_base_sink_unlock_stop(GstBaseSink * bsink)488 gst_curl_base_sink_unlock_stop (GstBaseSink * bsink)
489 {
490 GstCurlBaseSink *sink;
491
492 sink = GST_CURL_BASE_SINK (bsink);
493
494 GST_LOG_OBJECT (sink, "No longer flushing");
495 gst_poll_set_flushing (sink->fdset, FALSE);
496
497 return TRUE;
498 }
499
500 static void
gst_curl_base_sink_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)501 gst_curl_base_sink_set_property (GObject * object, guint prop_id,
502 const GValue * value, GParamSpec * pspec)
503 {
504 GstCurlBaseSink *sink;
505 GstState cur_state;
506
507 g_return_if_fail (GST_IS_CURL_BASE_SINK (object));
508 sink = GST_CURL_BASE_SINK (object);
509
510 gst_element_get_state (GST_ELEMENT (sink), &cur_state, NULL, 0);
511 if (cur_state != GST_STATE_PLAYING && cur_state != GST_STATE_PAUSED) {
512 GST_OBJECT_LOCK (sink);
513
514 switch (prop_id) {
515 case PROP_LOCATION:
516 g_free (sink->url);
517 sink->url = g_value_dup_string (value);
518 GST_DEBUG_OBJECT (sink, "url set to %s", sink->url);
519 break;
520 case PROP_USER_NAME:
521 g_free (sink->user);
522 sink->user = g_value_dup_string (value);
523 GST_DEBUG_OBJECT (sink, "user set to %s", sink->user);
524 break;
525 case PROP_USER_PASSWD:
526 g_free (sink->passwd);
527 sink->passwd = g_value_dup_string (value);
528 GST_DEBUG_OBJECT (sink, "passwd set to %s", sink->passwd);
529 break;
530 case PROP_FILE_NAME:
531 g_free (sink->file_name);
532 sink->file_name = g_value_dup_string (value);
533 GST_DEBUG_OBJECT (sink, "file_name set to %s", sink->file_name);
534 break;
535 case PROP_TIMEOUT:
536 sink->timeout = g_value_get_int (value);
537 GST_DEBUG_OBJECT (sink, "timeout set to %d", sink->timeout);
538 break;
539 case PROP_QOS_DSCP:
540 sink->qos_dscp = g_value_get_int (value);
541 gst_curl_base_sink_setup_dscp_unlocked (sink);
542 GST_DEBUG_OBJECT (sink, "dscp set to %d", sink->qos_dscp);
543 break;
544 default:
545 GST_DEBUG_OBJECT (sink, "invalid property id %d", prop_id);
546 break;
547 }
548
549 GST_OBJECT_UNLOCK (sink);
550
551 return;
552 }
553
554 /* in PLAYING or PAUSED state */
555 GST_OBJECT_LOCK (sink);
556
557 switch (prop_id) {
558 case PROP_FILE_NAME:
559 g_free (sink->file_name);
560 sink->file_name = g_value_dup_string (value);
561 GST_DEBUG_OBJECT (sink, "file_name set to %s", sink->file_name);
562 gst_curl_base_sink_new_file_notify_unlocked (sink);
563 break;
564 case PROP_TIMEOUT:
565 sink->timeout = g_value_get_int (value);
566 GST_DEBUG_OBJECT (sink, "timeout set to %d", sink->timeout);
567 break;
568 case PROP_QOS_DSCP:
569 sink->qos_dscp = g_value_get_int (value);
570 gst_curl_base_sink_setup_dscp_unlocked (sink);
571 GST_DEBUG_OBJECT (sink, "dscp set to %d", sink->qos_dscp);
572 break;
573 default:
574 GST_WARNING_OBJECT (sink, "cannot set property when PLAYING");
575 break;
576 }
577
578 GST_OBJECT_UNLOCK (sink);
579 }
580
581 static void
gst_curl_base_sink_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)582 gst_curl_base_sink_get_property (GObject * object, guint prop_id,
583 GValue * value, GParamSpec * pspec)
584 {
585 GstCurlBaseSink *sink;
586
587 g_return_if_fail (GST_IS_CURL_BASE_SINK (object));
588 sink = GST_CURL_BASE_SINK (object);
589
590 switch (prop_id) {
591 case PROP_LOCATION:
592 g_value_set_string (value, sink->url);
593 break;
594 case PROP_USER_NAME:
595 g_value_set_string (value, sink->user);
596 break;
597 case PROP_USER_PASSWD:
598 g_value_set_string (value, sink->passwd);
599 break;
600 case PROP_FILE_NAME:
601 g_value_set_string (value, sink->file_name);
602 break;
603 case PROP_TIMEOUT:
604 g_value_set_int (value, sink->timeout);
605 break;
606 case PROP_QOS_DSCP:
607 g_value_set_int (value, sink->qos_dscp);
608 break;
609 default:
610 GST_DEBUG_OBJECT (sink, "invalid property id");
611 break;
612 }
613 }
614
615 static gboolean
gst_curl_base_sink_transfer_set_common_options_unlocked(GstCurlBaseSink * sink)616 gst_curl_base_sink_transfer_set_common_options_unlocked (GstCurlBaseSink * sink)
617 {
618 GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
619 CURLcode res;
620
621 #ifndef GST_DISABLE_GST_DEBUG
622 res = curl_easy_setopt (sink->curl, CURLOPT_VERBOSE, 1);
623 if (res != CURLE_OK) {
624 sink->error = g_strdup_printf ("failed to set verbose: %s",
625 curl_easy_strerror (res));
626 return FALSE;
627 }
628 res = curl_easy_setopt (sink->curl, CURLOPT_DEBUGDATA, sink);
629 if (res != CURLE_OK) {
630 sink->error = g_strdup_printf ("failed to set debug user_data: %s",
631 curl_easy_strerror (res));
632 return FALSE;
633 }
634 res = curl_easy_setopt (sink->curl, CURLOPT_DEBUGFUNCTION,
635 gst_curl_base_sink_debug_cb);
636 if (res != CURLE_OK) {
637 sink->error = g_strdup_printf ("failed to set debug functions: %s",
638 curl_easy_strerror (res));
639 return FALSE;
640 }
641 #endif
642
643 res = curl_easy_setopt (sink->curl, CURLOPT_URL, sink->url);
644 if (res != CURLE_OK) {
645 sink->error = g_strdup_printf ("failed to set URL: %s",
646 curl_easy_strerror (res));
647 return FALSE;
648 }
649
650 res = curl_easy_setopt (sink->curl, CURLOPT_CONNECTTIMEOUT, sink->timeout);
651 if (res != CURLE_OK) {
652 sink->error = g_strdup_printf ("failed to set connection timeout: %s",
653 curl_easy_strerror (res));
654 return FALSE;
655 }
656
657 /* using signals in a multi-threaded application is dangerous */
658 res = curl_easy_setopt (sink->curl, CURLOPT_NOSIGNAL, 1);
659 if (res != CURLE_OK) {
660 sink->error = g_strdup_printf ("failed to set no signalling: %s",
661 curl_easy_strerror (res));
662 return FALSE;
663 }
664
665 /* socket settings */
666 res = curl_easy_setopt (sink->curl, CURLOPT_SOCKOPTDATA, sink);
667 if (res != CURLE_OK) {
668 sink->error = g_strdup_printf ("failed to set sockopt user data: %s",
669 curl_easy_strerror (res));
670 return FALSE;
671 }
672 res = curl_easy_setopt (sink->curl, CURLOPT_SOCKOPTFUNCTION,
673 gst_curl_base_sink_transfer_socket_cb);
674 if (res != CURLE_OK) {
675 sink->error = g_strdup_printf ("failed to set sockopt function: %s",
676 curl_easy_strerror (res));
677 return FALSE;
678 }
679
680 res = curl_easy_setopt (sink->curl, CURLOPT_READDATA, sink);
681 if (res != CURLE_OK) {
682 sink->error = g_strdup_printf ("failed to set read user data: %s",
683 curl_easy_strerror (res));
684 return FALSE;
685 }
686 res = curl_easy_setopt (sink->curl, CURLOPT_READFUNCTION,
687 klass->transfer_read_cb);
688 if (res != CURLE_OK) {
689 sink->error = g_strdup_printf ("failed to set read function: %s",
690 curl_easy_strerror (res));
691 return FALSE;
692 }
693
694 res = curl_easy_setopt (sink->curl, CURLOPT_WRITEDATA, sink);
695 if (res != CURLE_OK) {
696 sink->error = g_strdup_printf ("failed to set write user data: %s",
697 curl_easy_strerror (res));
698 return FALSE;
699 }
700 res = curl_easy_setopt (sink->curl, CURLOPT_WRITEFUNCTION,
701 gst_curl_base_sink_transfer_write_cb);
702 if (res != CURLE_OK) {
703 sink->error = g_strdup_printf ("failed to set write function: %s",
704 curl_easy_strerror (res));
705 return FALSE;
706 }
707 /* Time out in case transfer speed in bytes per second stay below
708 * CURLOPT_LOW_SPEED_LIMIT during CURLOPT_LOW_SPEED_TIME */
709 res = curl_easy_setopt (sink->curl, CURLOPT_LOW_SPEED_LIMIT, 1L);
710 if (res != CURLE_OK) {
711 sink->error = g_strdup_printf ("failed to set low speed limit: %s",
712 curl_easy_strerror (res));
713 return FALSE;
714 }
715 res = curl_easy_setopt (sink->curl, CURLOPT_LOW_SPEED_TIME,
716 (long) sink->timeout);
717 if (res != CURLE_OK) {
718 sink->error = g_strdup_printf ("failed to set low speed time: %s",
719 curl_easy_strerror (res));
720 return FALSE;
721 }
722
723 GST_LOG ("common options set");
724 return TRUE;
725 }
726
727 static gboolean
gst_curl_base_sink_transfer_set_options_unlocked(GstCurlBaseSink * sink)728 gst_curl_base_sink_transfer_set_options_unlocked (GstCurlBaseSink * sink)
729 {
730 GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
731 CURLcode res;
732
733 if (!gst_curl_base_sink_transfer_set_common_options_unlocked (sink)) {
734 return FALSE;
735 }
736
737 /* authentication settings */
738 if (sink->user != NULL && strlen (sink->user)) {
739 res = curl_easy_setopt (sink->curl, CURLOPT_USERNAME, sink->user);
740 if (res != CURLE_OK) {
741 sink->error = g_strdup_printf ("failed to set user name: %s",
742 curl_easy_strerror (res));
743 return FALSE;
744 }
745 res = curl_easy_setopt (sink->curl, CURLOPT_PASSWORD, sink->passwd);
746 if (res != CURLE_OK) {
747 sink->error = g_strdup_printf ("failed to set password: %s",
748 curl_easy_strerror (res));
749 return FALSE;
750 }
751 }
752
753 if (klass->set_options_unlocked) {
754 return klass->set_options_unlocked (sink);
755 } else {
756 return FALSE;
757 }
758 }
759
760 static size_t
transfer_data_buffer(void * curl_ptr,TransferBuffer * buf,size_t max_bytes_to_send,guint * last_chunk)761 transfer_data_buffer (void *curl_ptr, TransferBuffer * buf,
762 size_t max_bytes_to_send, guint * last_chunk)
763 {
764 guint buf_len = buf->len;
765 size_t bytes_to_send = MIN (max_bytes_to_send, buf->len);
766
767 memcpy ((guint8 *) curl_ptr, buf->ptr + buf->offset, bytes_to_send);
768 buf->offset = buf->offset + bytes_to_send;
769 buf->len = buf->len - bytes_to_send;
770
771 /* the last data chunk */
772 if (bytes_to_send == buf_len) {
773 buf->offset = 0;
774 buf->len = 0;
775 *last_chunk = 1;
776 }
777
778 GST_LOG ("sent : %" G_GSIZE_FORMAT, bytes_to_send);
779
780 return bytes_to_send;
781 }
782
783 static size_t
gst_curl_base_sink_transfer_data_buffer(GstCurlBaseSink * sink,void * curl_ptr,size_t block_size,guint * last_chunk)784 gst_curl_base_sink_transfer_data_buffer (GstCurlBaseSink * sink,
785 void *curl_ptr, size_t block_size, guint * last_chunk)
786 {
787 TransferBuffer *buffer;
788
789 buffer = sink->transfer_buf;
790 GST_LOG ("write buf len=%" G_GSIZE_FORMAT ", offset=%" G_GSIZE_FORMAT,
791 buffer->len, buffer->offset);
792
793 if (buffer->len <= 0) {
794 GST_WARNING ("got zero- or negative-length buffer");
795
796 return 0;
797 }
798
799 /* more data in buffer(s) */
800 return transfer_data_buffer (curl_ptr, buffer, block_size, last_chunk);
801 }
802
803 static size_t
gst_curl_base_sink_transfer_read_cb(void * curl_ptr,size_t size,size_t nmemb,void * stream)804 gst_curl_base_sink_transfer_read_cb (void *curl_ptr, size_t size, size_t nmemb,
805 void *stream)
806 {
807 GstCurlBaseSink *sink;
808 GstCurlBaseSinkClass *klass;
809 size_t max_bytes_to_send;
810 size_t bytes_to_send;
811 guint last_chunk = 0;
812
813 sink = (GstCurlBaseSink *) stream;
814 klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
815
816 max_bytes_to_send = size * nmemb;
817
818 /* wait for data to come available, if new file or thread close is set
819 * then zero will be returned to indicate end of current transfer */
820 GST_OBJECT_LOCK (sink);
821 if (gst_curl_base_sink_wait_for_data_unlocked (sink) == FALSE) {
822
823 if (gst_curl_base_sink_has_buffered_data_unlocked (sink) &&
824 sink->transfer_thread_close) {
825 GST_WARNING_OBJECT (sink,
826 "discarding render data due to thread close flag");
827
828 GST_OBJECT_UNLOCK (sink);
829 return CURL_READFUNC_ABORT;
830 }
831
832 if (klass->flush_data_unlocked) {
833 bytes_to_send = klass->flush_data_unlocked (sink, curl_ptr,
834 max_bytes_to_send, sink->new_file, sink->transfer_thread_close);
835
836 GST_OBJECT_UNLOCK (sink);
837
838 return bytes_to_send;
839 }
840
841 GST_OBJECT_UNLOCK (sink);
842 GST_LOG ("returning 0, no more data to send in this file");
843
844 return 0;
845 }
846
847 GST_OBJECT_UNLOCK (sink);
848
849 bytes_to_send = klass->transfer_data_buffer (sink, curl_ptr,
850 max_bytes_to_send, &last_chunk);
851
852 /* the last data chunk */
853 if (last_chunk) {
854 gst_curl_base_sink_data_sent_notify (sink);
855 }
856
857 return bytes_to_send;
858 }
859
860 static size_t
gst_curl_base_sink_transfer_write_cb(void G_GNUC_UNUSED * ptr,size_t size,size_t nmemb,void G_GNUC_UNUSED * stream)861 gst_curl_base_sink_transfer_write_cb (void G_GNUC_UNUSED * ptr, size_t size,
862 size_t nmemb, void G_GNUC_UNUSED * stream)
863 {
864 GstCurlBaseSink *sink;
865 GstCurlBaseSinkClass *klass;
866 size_t realsize = size * nmemb;
867
868 sink = (GstCurlBaseSink *) stream;
869 klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
870
871 if (klass->transfer_verify_response_code) {
872 if (!klass->transfer_verify_response_code (sink)) {
873 GST_DEBUG_OBJECT (sink, "response error");
874 GST_OBJECT_LOCK (sink);
875 sink->flow_ret = GST_FLOW_ERROR;
876 GST_OBJECT_UNLOCK (sink);
877 }
878 }
879
880 GST_DEBUG ("response %s", (gchar *) ptr);
881
882 return realsize;
883 }
884
885 CURLcode
gst_curl_base_sink_transfer_check(GstCurlBaseSink * sink)886 gst_curl_base_sink_transfer_check (GstCurlBaseSink * sink)
887 {
888 CURLcode code = CURLE_OK;
889 CURL *easy;
890 CURLMsg *msg;
891 gint msgs_left;
892 gchar *eff_url = NULL;
893
894 do {
895 easy = NULL;
896 while ((msg = curl_multi_info_read (sink->multi_handle, &msgs_left))) {
897 if (msg->msg == CURLMSG_DONE) {
898 easy = msg->easy_handle;
899 code = msg->data.result;
900 break;
901 }
902 }
903 if (easy) {
904 curl_easy_getinfo (easy, CURLINFO_EFFECTIVE_URL, &eff_url);
905 GST_DEBUG ("transfer done %s (%s-%d)\n", eff_url,
906 curl_easy_strerror (code), code);
907 }
908 } while (easy);
909
910 return code;
911 }
912
913 static void
handle_transfer(GstCurlBaseSink * sink)914 handle_transfer (GstCurlBaseSink * sink)
915 {
916 GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
917 gint retval;
918 gint activated_fds;
919 gint running_handles;
920 gint timeout;
921 CURLMcode m_code;
922 CURLcode e_code;
923
924 GST_OBJECT_LOCK (sink);
925 timeout = sink->timeout;
926 GST_OBJECT_UNLOCK (sink);
927
928 GST_DEBUG_OBJECT (sink, "handling transfers");
929
930 /* Receiving CURLM_CALL_MULTI_PERFORM means that libcurl may have more data
931 available to send or receive - call simply curl_multi_perform before
932 poll() on more actions */
933 do {
934 m_code = curl_multi_perform (sink->multi_handle, &running_handles);
935 } while (m_code == CURLM_CALL_MULTI_PERFORM);
936 GST_DEBUG_OBJECT (sink, "running handles: %d", running_handles);
937
938 while (running_handles && (m_code == CURLM_OK)) {
939 if (klass->transfer_prepare_poll_wait) {
940 klass->transfer_prepare_poll_wait (sink);
941 }
942
943 activated_fds = gst_poll_wait (sink->fdset, timeout * GST_SECOND);
944 if (G_UNLIKELY (activated_fds == -1)) {
945 if (errno == EAGAIN || errno == EINTR) {
946 GST_DEBUG_OBJECT (sink, "interrupted by signal");
947 } else if (errno == EBUSY) {
948 GST_DEBUG_OBJECT (sink, "poll stopped");
949 retval = GST_FLOW_EOS;
950
951 GST_OBJECT_LOCK (sink);
952 if (gst_curl_base_sink_has_buffered_data_unlocked (sink))
953 GST_WARNING_OBJECT (sink,
954 "discarding render data due to thread close flag");
955 GST_OBJECT_UNLOCK (sink);
956
957 goto fail;
958 } else {
959 sink->error = g_strdup_printf ("poll failed: %s", g_strerror (errno));
960 retval = GST_FLOW_ERROR;
961 goto fail;
962 }
963 } else if (G_UNLIKELY (activated_fds == 0)) {
964 sink->error = g_strdup_printf ("poll timed out after %" GST_TIME_FORMAT,
965 GST_TIME_ARGS (timeout * GST_SECOND));
966 retval = GST_FLOW_ERROR;
967 goto fail;
968 }
969
970 /* readable/writable sockets */
971 do {
972 m_code = curl_multi_perform (sink->multi_handle, &running_handles);
973 } while (m_code == CURLM_CALL_MULTI_PERFORM);
974 GST_DEBUG_OBJECT (sink, "running handles: %d", running_handles);
975 }
976
977 if (m_code != CURLM_OK) {
978 sink->error = g_strdup_printf ("failed to write data: %s",
979 curl_multi_strerror (m_code));
980 retval = GST_FLOW_ERROR;
981 goto fail;
982 }
983
984 /* problems still might have occurred on individual transfers even when
985 * curl_multi_perform returns CURLM_OK */
986 if ((e_code = gst_curl_base_sink_transfer_check (sink)) != CURLE_OK) {
987 sink->error = g_strdup_printf ("failed to transfer data: %s",
988 curl_easy_strerror (e_code));
989 retval = GST_FLOW_ERROR;
990 goto fail;
991 }
992
993 gst_curl_base_sink_got_response_notify (sink);
994
995 GST_OBJECT_LOCK (sink);
996 if (sink->socket_type == CURLSOCKTYPE_ACCEPT) {
997 /* FIXME: remove this again once we can depend on libcurl > 7.44.0,
998 * see https://github.com/bagder/curl/issues/405.
999 */
1000 if (G_UNLIKELY (sink->fd.fd < 0)) {
1001 sink->error = g_strdup_printf ("unknown error");
1002 retval = GST_FLOW_ERROR;
1003 GST_OBJECT_UNLOCK (sink);
1004 goto fail;
1005 }
1006 if (!gst_poll_remove_fd (sink->fdset, &sink->fd)) {
1007 sink->error = g_strdup_printf ("failed to remove fd");
1008 retval = GST_FLOW_ERROR;
1009 GST_OBJECT_UNLOCK (sink);
1010 goto fail;
1011 }
1012 sink->fd.fd = -1;
1013 }
1014 GST_OBJECT_UNLOCK (sink);
1015
1016 return;
1017
1018 fail:
1019 GST_OBJECT_LOCK (sink);
1020 if (sink->flow_ret == GST_FLOW_OK) {
1021 sink->flow_ret = retval;
1022 }
1023 GST_OBJECT_UNLOCK (sink);
1024 return;
1025 }
1026
1027 #ifndef GST_DISABLE_GST_DEBUG
1028 static int
gst_curl_base_sink_debug_cb(CURL * handle,curl_infotype type,char * data,size_t size,void * clientp)1029 gst_curl_base_sink_debug_cb (CURL * handle, curl_infotype type, char *data,
1030 size_t size, void *clientp)
1031 {
1032 GstCurlBaseSink *sink = (GstCurlBaseSink *) clientp;
1033 gchar *msg = NULL;
1034
1035 switch (type) {
1036 case CURLINFO_TEXT:
1037 case CURLINFO_HEADER_IN:
1038 case CURLINFO_HEADER_OUT:
1039 msg = g_memdup (data, size);
1040 if (size > 0) {
1041 msg[size - 1] = '\0';
1042 g_strchomp (msg);
1043 }
1044 break;
1045 default:
1046 break;
1047 }
1048
1049 switch (type) {
1050 case CURLINFO_TEXT:
1051 GST_DEBUG_OBJECT (sink, "%s", msg);
1052 break;
1053 case CURLINFO_HEADER_IN:
1054 GST_DEBUG_OBJECT (sink, "incoming header: %s", msg);
1055 break;
1056 case CURLINFO_HEADER_OUT:
1057 GST_DEBUG_OBJECT (sink, "outgoing header: %s", msg);
1058 break;
1059 case CURLINFO_DATA_IN:
1060 GST_MEMDUMP_OBJECT (sink, "incoming data", (guint8 *) data, size);
1061 break;
1062 case CURLINFO_DATA_OUT:
1063 GST_MEMDUMP_OBJECT (sink, "outgoing data", (guint8 *) data, size);
1064 break;
1065 case CURLINFO_SSL_DATA_IN:
1066 GST_MEMDUMP_OBJECT (sink, "incoming ssl data", (guint8 *) data, size);
1067 break;
1068 case CURLINFO_SSL_DATA_OUT:
1069 GST_MEMDUMP_OBJECT (sink, "outgoing ssl data", (guint8 *) data, size);
1070 break;
1071 default:
1072 GST_DEBUG_OBJECT (sink, "unknown debug info type %d", type);
1073 GST_MEMDUMP_OBJECT (sink, "unknown data", (guint8 *) data, size);
1074 break;
1075 }
1076 g_free (msg);
1077 return 0;
1078 }
1079 #endif
1080
1081 /* This function gets called by libcurl after the socket() call but before
1082 * the connect() call. */
1083 static int
gst_curl_base_sink_transfer_socket_cb(void * clientp,curl_socket_t curlfd,curlsocktype socket_type)1084 gst_curl_base_sink_transfer_socket_cb (void *clientp, curl_socket_t curlfd,
1085 curlsocktype socket_type)
1086 {
1087 GstCurlBaseSink *sink;
1088 gboolean ret = TRUE;
1089
1090 sink = (GstCurlBaseSink *) clientp;
1091
1092 g_assert (sink);
1093
1094 if (curlfd < 0) {
1095 /* signal an unrecoverable error to the library which will close the socket
1096 and return CURLE_COULDNT_CONNECT
1097 */
1098 GST_DEBUG_OBJECT (sink, "no curlfd");
1099 return 1;
1100 }
1101
1102 GST_OBJECT_LOCK (sink);
1103 sink->socket_type = socket_type;
1104
1105 if (sink->fd.fd != curlfd) {
1106 if (sink->fd.fd > 0 && sink->socket_type != CURLSOCKTYPE_ACCEPT) {
1107 ret &= gst_poll_remove_fd (sink->fdset, &sink->fd);
1108 }
1109 sink->fd.fd = curlfd;
1110 ret &= gst_poll_add_fd (sink->fdset, &sink->fd);
1111 ret &= gst_poll_fd_ctl_write (sink->fdset, &sink->fd, TRUE);
1112 ret &= gst_poll_fd_ctl_read (sink->fdset, &sink->fd, TRUE);
1113 }
1114 GST_DEBUG_OBJECT (sink, "fd: %d", sink->fd.fd);
1115 gst_curl_base_sink_setup_dscp_unlocked (sink);
1116 GST_OBJECT_UNLOCK (sink);
1117
1118 /* success */
1119 return ret ? 0 : 1;
1120 }
1121
1122 static gboolean
gst_curl_base_sink_transfer_start_unlocked(GstCurlBaseSink * sink)1123 gst_curl_base_sink_transfer_start_unlocked (GstCurlBaseSink * sink)
1124 {
1125 GError *error = NULL;
1126 gboolean ret = TRUE;
1127
1128 GST_LOG ("creating transfer thread");
1129 sink->transfer_thread_close = FALSE;
1130 sink->new_file = TRUE;
1131 sink->transfer_thread = g_thread_try_new ("curl-transfer", (GThreadFunc)
1132 gst_curl_base_sink_transfer_thread_func, sink, &error);
1133
1134 if (sink->transfer_thread == NULL || error != NULL) {
1135 ret = FALSE;
1136 if (error) {
1137 GST_ERROR_OBJECT (sink, "could not create thread %s", error->message);
1138 g_error_free (error);
1139 } else {
1140 GST_ERROR_OBJECT (sink, "could not create thread for unknown reason");
1141 }
1142 }
1143
1144 return ret;
1145 }
1146
1147 static gpointer
gst_curl_base_sink_transfer_thread_func(gpointer data)1148 gst_curl_base_sink_transfer_thread_func (gpointer data)
1149 {
1150 GstCurlBaseSink *sink = (GstCurlBaseSink *) data;
1151 GstCurlBaseSinkClass *klass = GST_CURL_BASE_SINK_GET_CLASS (sink);
1152 GstFlowReturn ret;
1153 gboolean data_available;
1154
1155 GST_LOG ("transfer thread started");
1156 GST_OBJECT_LOCK (sink);
1157 if (!gst_curl_base_sink_transfer_setup_unlocked (sink)) {
1158 /* no need to set sink->error, as it is set by the called function */
1159 sink->flow_ret = GST_FLOW_ERROR;
1160 goto done;
1161 }
1162
1163 while (!sink->transfer_thread_close && sink->flow_ret == GST_FLOW_OK) {
1164 /* we are working on a new file, clearing flag and setting a new file
1165 * name */
1166 sink->new_file = FALSE;
1167
1168 /* wait for data to arrive for this new file, if we get a new file name
1169 * again before getting data we will simply skip transferring anything
1170 * for this file and go directly to the new file */
1171 data_available = gst_curl_base_sink_wait_for_data_unlocked (sink);
1172 if (data_available) {
1173 if (G_UNLIKELY (!klass->set_protocol_dynamic_options_unlocked (sink))) {
1174 sink->error = g_strdup ("unexpected state");
1175 sink->flow_ret = GST_FLOW_ERROR;
1176 goto done;
1177 }
1178 }
1179
1180 /* stay unlocked while handling the actual transfer */
1181 GST_OBJECT_UNLOCK (sink);
1182
1183 if (data_available) {
1184 GST_LOG ("have data");
1185 if (!gst_curl_base_sink_is_live (sink)) {
1186 /* prepare transfer if needed */
1187 if (klass->prepare_transfer) {
1188 GST_OBJECT_LOCK (sink);
1189 if (!klass->prepare_transfer (sink)) {
1190 sink->flow_ret = GST_FLOW_ERROR;
1191 goto done;
1192 }
1193 GST_OBJECT_UNLOCK (sink);
1194 }
1195 GST_LOG ("adding handle");
1196 curl_multi_add_handle (sink->multi_handle, sink->curl);
1197 }
1198
1199 /* Start driving the transfer. */
1200 klass->handle_transfer (sink);
1201
1202 /* easy handle will be possibly re-used for next transfer, thus it needs
1203 * to be removed from the multi stack and re-added again */
1204 if (!gst_curl_base_sink_is_live (sink)) {
1205 GST_LOG ("removing handle");
1206 curl_multi_remove_handle (sink->multi_handle, sink->curl);
1207 }
1208 } else {
1209 GST_LOG ("have no data yet");
1210 }
1211
1212 /* lock again before looping to check the thread closed flag */
1213 GST_OBJECT_LOCK (sink);
1214 }
1215
1216 if (sink->is_live) {
1217 GST_LOG ("removing handle");
1218 curl_multi_remove_handle (sink->multi_handle, sink->curl);
1219 }
1220
1221 done:
1222 gst_curl_base_sink_transfer_cleanup (sink);
1223
1224 /* extract the error code so the lock does not have to be
1225 * taken when calling the functions below that take the lock
1226 * on their own */
1227 ret = sink->flow_ret;
1228 GST_OBJECT_UNLOCK (sink);
1229
1230 /* if there is a flow error, always notify the render function so it
1231 * can return the flow error up along the pipeline. as an error has
1232 * occurred there is no response to receive, so notify the event function
1233 * so it doesn't block indefinitely waiting for a response. */
1234 if (ret != GST_FLOW_OK) {
1235 gst_curl_base_sink_data_sent_notify (sink);
1236 gst_curl_base_sink_got_response_notify (sink);
1237 }
1238
1239 GST_DEBUG ("exit thread func - transfer thread close flag: %d",
1240 sink->transfer_thread_close);
1241
1242 return NULL;
1243 }
1244
1245 static gboolean
gst_curl_base_sink_transfer_setup_unlocked(GstCurlBaseSink * sink)1246 gst_curl_base_sink_transfer_setup_unlocked (GstCurlBaseSink * sink)
1247 {
1248 g_assert (sink);
1249
1250 if (sink->curl == NULL) {
1251 /* curl_easy_init automatically calls curl_global_init(3) */
1252 if ((sink->curl = curl_easy_init ()) == NULL) {
1253 sink->error = g_strdup ("failed to init curl easy handle");
1254 return FALSE;
1255 }
1256 }
1257
1258 if (!gst_curl_base_sink_transfer_set_options_unlocked (sink)) {
1259 if (!sink->error) {
1260 sink->error = g_strdup ("failed to setup curl easy handle");
1261 }
1262 return FALSE;
1263 }
1264
1265 /* init a multi stack (non-blocking interface to libcurl) */
1266 if (sink->multi_handle == NULL) {
1267 if ((sink->multi_handle = curl_multi_init ()) == NULL) {
1268 sink->error = g_strdup ("failed to init curl multi handle");
1269 return FALSE;
1270 }
1271 }
1272
1273 GST_LOG ("transfer setup done");
1274 return TRUE;
1275 }
1276
1277 static void
gst_curl_base_sink_transfer_cleanup(GstCurlBaseSink * sink)1278 gst_curl_base_sink_transfer_cleanup (GstCurlBaseSink * sink)
1279 {
1280 if (sink->curl != NULL) {
1281 if (sink->multi_handle != NULL) {
1282 curl_multi_remove_handle (sink->multi_handle, sink->curl);
1283 }
1284 curl_easy_cleanup (sink->curl);
1285 sink->curl = NULL;
1286 }
1287
1288 if (sink->multi_handle != NULL) {
1289 curl_multi_cleanup (sink->multi_handle);
1290 sink->multi_handle = NULL;
1291 }
1292 }
1293
1294 static gboolean
gst_curl_base_sink_wait_for_data_unlocked(GstCurlBaseSink * sink)1295 gst_curl_base_sink_wait_for_data_unlocked (GstCurlBaseSink * sink)
1296 {
1297 gboolean data_available = FALSE;
1298
1299 GST_LOG ("waiting for data");
1300 while (!sink->transfer_cond->data_available &&
1301 !sink->transfer_thread_close && !sink->new_file) {
1302 g_cond_wait (&sink->transfer_cond->cond, GST_OBJECT_GET_LOCK (sink));
1303 }
1304
1305 if (sink->transfer_thread_close) {
1306 GST_LOG ("wait for data aborted due to thread close");
1307 } else if (sink->new_file) {
1308 GST_LOG ("wait for data aborted due to new file name");
1309 } else {
1310 GST_LOG ("wait for data completed");
1311 data_available = TRUE;
1312 }
1313
1314 return data_available;
1315 }
1316
1317 static void
gst_curl_base_sink_new_file_notify_unlocked(GstCurlBaseSink * sink)1318 gst_curl_base_sink_new_file_notify_unlocked (GstCurlBaseSink * sink)
1319 {
1320 GST_LOG ("new file name");
1321 sink->new_file = TRUE;
1322 g_cond_signal (&sink->transfer_cond->cond);
1323 }
1324
1325 static void
gst_curl_base_sink_wait_for_transfer_thread_to_send_unlocked(GstCurlBaseSink * sink)1326 gst_curl_base_sink_wait_for_transfer_thread_to_send_unlocked
1327 (GstCurlBaseSink * sink)
1328 {
1329 GST_LOG ("waiting for buffer send to complete");
1330
1331 /* this function should not check if the transfer thread is set to be closed
1332 * since that flag only can be set by the EOS event (by the pipeline thread).
1333 * This can therefore never happen while this function is running since this
1334 * function also is called by the pipeline thread (in the render function) */
1335 while (!sink->transfer_cond->data_sent) {
1336 g_cond_wait (&sink->transfer_cond->cond, GST_OBJECT_GET_LOCK (sink));
1337 }
1338 GST_LOG ("buffer send completed");
1339 }
1340
1341 static void
gst_curl_base_sink_data_sent_notify(GstCurlBaseSink * sink)1342 gst_curl_base_sink_data_sent_notify (GstCurlBaseSink * sink)
1343 {
1344 GST_LOG ("transfer completed");
1345 GST_OBJECT_LOCK (sink);
1346 sink->transfer_cond->data_available = FALSE;
1347 sink->transfer_cond->data_sent = TRUE;
1348 g_cond_signal (&sink->transfer_cond->cond);
1349 GST_OBJECT_UNLOCK (sink);
1350 }
1351
1352 static void
gst_curl_base_sink_wait_for_response(GstCurlBaseSink * sink)1353 gst_curl_base_sink_wait_for_response (GstCurlBaseSink * sink)
1354 {
1355 GST_LOG ("waiting for remote to send response code");
1356
1357 GST_OBJECT_LOCK (sink);
1358 while (sink->transfer_cond->wait_for_response) {
1359 g_cond_wait (&sink->transfer_cond->cond, GST_OBJECT_GET_LOCK (sink));
1360 }
1361 GST_OBJECT_UNLOCK (sink);
1362
1363 GST_LOG ("response code received");
1364 }
1365
1366 static void
gst_curl_base_sink_got_response_notify(GstCurlBaseSink * sink)1367 gst_curl_base_sink_got_response_notify (GstCurlBaseSink * sink)
1368 {
1369 GST_LOG ("got response code");
1370
1371 GST_OBJECT_LOCK (sink);
1372 sink->transfer_cond->wait_for_response = FALSE;
1373 g_cond_signal (&sink->transfer_cond->cond);
1374 GST_OBJECT_UNLOCK (sink);
1375 }
1376
1377 static gint
gst_curl_base_sink_setup_dscp_unlocked(GstCurlBaseSink * sink)1378 gst_curl_base_sink_setup_dscp_unlocked (GstCurlBaseSink * sink)
1379 {
1380 gint tos;
1381 gint af;
1382 gint ret = -1;
1383 union
1384 {
1385 struct sockaddr sa;
1386 struct sockaddr_in6 sa_in6;
1387 struct sockaddr_storage sa_stor;
1388 } sa;
1389 socklen_t slen = sizeof (sa);
1390
1391 if (getsockname (sink->fd.fd, &sa.sa, &slen) < 0) {
1392 GST_DEBUG_OBJECT (sink, "could not get sockname: %s", g_strerror (errno));
1393 return ret;
1394 }
1395 af = sa.sa.sa_family;
1396
1397 /* if this is an IPv4-mapped address then do IPv4 QoS */
1398 if (af == AF_INET6) {
1399 GST_DEBUG_OBJECT (sink, "check IP6 socket");
1400 if (IN6_IS_ADDR_V4MAPPED (&(sa.sa_in6.sin6_addr))) {
1401 GST_DEBUG_OBJECT (sink, "mapped to IPV4");
1402 af = AF_INET;
1403 }
1404 }
1405 /* extract and shift 6 bits of the DSCP */
1406 tos = (sink->qos_dscp & 0x3f) << 2;
1407
1408 switch (af) {
1409 case AF_INET:
1410 ret = setsockopt (sink->fd.fd, IPPROTO_IP, IP_TOS, (void *) &tos,
1411 sizeof (tos));
1412 break;
1413 case AF_INET6:
1414 #ifdef IPV6_TCLASS
1415 ret = setsockopt (sink->fd.fd, IPPROTO_IPV6, IPV6_TCLASS, (void *) &tos,
1416 sizeof (tos));
1417 break;
1418 #endif
1419 default:
1420 GST_ERROR_OBJECT (sink, "unsupported AF");
1421 break;
1422 }
1423 if (ret) {
1424 GST_DEBUG_OBJECT (sink, "could not set DSCP: %s", g_strerror (errno));
1425 }
1426
1427 return ret;
1428 }
1429