1 /*
2  * GStreamer
3  * Copyright (C) 2010 Jan Schmidt <thaytan@noraisin.net>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20 
21 /**
22  * SECTION:element-rtmpsink
23  * @title: rtmpsink
24  *
25  * This element delivers data to a streaming server via RTMP. It uses
26  * librtmp, and supports any protocols/urls that librtmp supports.
27  * The URL/location can contain extra connection or session parameters
28  * for librtmp, such as 'flashver=version'. See the librtmp documentation
29  * for more detail
30  *
31  * ## Example launch line
32  * |[
33  * gst-launch-1.0 -v videotestsrc ! ffenc_flv ! flvmux ! rtmpsink location='rtmp://localhost/path/to/stream live=1'
34  * ]| Encode a test video stream to FLV video format and stream it via RTMP.
35  *
36  */
37 
38 #ifdef HAVE_CONFIG_H
39 #include "config.h"
40 #endif
41 
42 #include <gst/gst.h>
43 
44 #include "gstrtmpsink.h"
45 
46 #ifdef G_OS_WIN32
47 #include <winsock2.h>
48 #endif
49 
50 #include <stdlib.h>
51 
52 GST_DEBUG_CATEGORY_STATIC (gst_rtmp_sink_debug);
53 #define GST_CAT_DEFAULT gst_rtmp_sink_debug
54 
55 #define DEFAULT_LOCATION NULL
56 
57 enum
58 {
59   PROP_0,
60   PROP_LOCATION
61 };
62 
63 static GstStaticPadTemplate sink_template = GST_STATIC_PAD_TEMPLATE ("sink",
64     GST_PAD_SINK,
65     GST_PAD_ALWAYS,
66     GST_STATIC_CAPS ("video/x-flv")
67     );
68 
69 static void gst_rtmp_sink_uri_handler_init (gpointer g_iface,
70     gpointer iface_data);
71 static void gst_rtmp_sink_set_property (GObject * object, guint prop_id,
72     const GValue * value, GParamSpec * pspec);
73 static void gst_rtmp_sink_get_property (GObject * object, guint prop_id,
74     GValue * value, GParamSpec * pspec);
75 static void gst_rtmp_sink_finalize (GObject * object);
76 static gboolean gst_rtmp_sink_stop (GstBaseSink * sink);
77 static gboolean gst_rtmp_sink_start (GstBaseSink * sink);
78 static gboolean gst_rtmp_sink_event (GstBaseSink * sink, GstEvent * event);
79 static gboolean gst_rtmp_sink_setcaps (GstBaseSink * sink, GstCaps * caps);
80 static GstFlowReturn gst_rtmp_sink_render (GstBaseSink * sink, GstBuffer * buf);
81 
82 #define gst_rtmp_sink_parent_class parent_class
83 G_DEFINE_TYPE_WITH_CODE (GstRTMPSink, gst_rtmp_sink, GST_TYPE_BASE_SINK,
84     G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER,
85         gst_rtmp_sink_uri_handler_init));
86 
87 /* initialize the plugin's class */
88 static void
gst_rtmp_sink_class_init(GstRTMPSinkClass * klass)89 gst_rtmp_sink_class_init (GstRTMPSinkClass * klass)
90 {
91   GObjectClass *gobject_class;
92   GstElementClass *gstelement_class;
93   GstBaseSinkClass *gstbasesink_class;
94 
95   gobject_class = (GObjectClass *) klass;
96   gstelement_class = (GstElementClass *) klass;
97   gstbasesink_class = (GstBaseSinkClass *) klass;
98 
99   gobject_class->finalize = gst_rtmp_sink_finalize;
100   gobject_class->set_property = gst_rtmp_sink_set_property;
101   gobject_class->get_property = gst_rtmp_sink_get_property;
102 
103   g_object_class_install_property (gobject_class, PROP_LOCATION,
104       g_param_spec_string ("location", "RTMP Location", "RTMP url",
105           DEFAULT_LOCATION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
106 
107   gst_element_class_set_static_metadata (gstelement_class,
108       "RTMP output sink",
109       "Sink/Network", "Sends FLV content to a server via RTMP",
110       "Jan Schmidt <thaytan@noraisin.net>");
111 
112   gst_element_class_add_static_pad_template (gstelement_class, &sink_template);
113 
114   gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_rtmp_sink_start);
115   gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_rtmp_sink_stop);
116   gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_rtmp_sink_render);
117   gstbasesink_class->set_caps = GST_DEBUG_FUNCPTR (gst_rtmp_sink_setcaps);
118   gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_rtmp_sink_event);
119 
120   GST_DEBUG_CATEGORY_INIT (gst_rtmp_sink_debug, "rtmpsink", 0,
121       "RTMP server element");
122 }
123 
124 /* initialize the new element
125  * initialize instance structure
126  */
127 static void
gst_rtmp_sink_init(GstRTMPSink * sink)128 gst_rtmp_sink_init (GstRTMPSink * sink)
129 {
130 #ifdef G_OS_WIN32
131   WSADATA wsa_data;
132 
133   if (WSAStartup (MAKEWORD (2, 2), &wsa_data) != 0) {
134     GST_ERROR_OBJECT (sink, "WSAStartup failed: 0x%08x", WSAGetLastError ());
135   }
136 #endif
137 }
138 
139 static void
gst_rtmp_sink_finalize(GObject * object)140 gst_rtmp_sink_finalize (GObject * object)
141 {
142   GstRTMPSink *sink = GST_RTMP_SINK (object);
143 
144 #ifdef G_OS_WIN32
145   WSACleanup ();
146 #endif
147   g_free (sink->uri);
148 
149   G_OBJECT_CLASS (parent_class)->finalize (object);
150 }
151 
152 
153 static gboolean
gst_rtmp_sink_start(GstBaseSink * basesink)154 gst_rtmp_sink_start (GstBaseSink * basesink)
155 {
156   GstRTMPSink *sink = GST_RTMP_SINK (basesink);
157 
158   if (!sink->uri) {
159     GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_WRITE,
160         ("Please set URI for RTMP output"), ("No URI set before starting"));
161     return FALSE;
162   }
163 
164   sink->rtmp_uri = g_strdup (sink->uri);
165   sink->rtmp = RTMP_Alloc ();
166 
167   if (!sink->rtmp) {
168     GST_ERROR_OBJECT (sink, "Could not allocate librtmp's RTMP context");
169     goto error;
170   }
171 
172   RTMP_Init (sink->rtmp);
173   if (!RTMP_SetupURL (sink->rtmp, sink->rtmp_uri)) {
174     GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_WRITE, (NULL),
175         ("Failed to setup URL '%s'", sink->uri));
176     goto error;
177   }
178 
179   GST_DEBUG_OBJECT (sink, "Created RTMP object");
180 
181   /* Mark this as an output connection */
182   RTMP_EnableWrite (sink->rtmp);
183 
184   sink->first = TRUE;
185   sink->have_write_error = FALSE;
186 
187   return TRUE;
188 
189 error:
190   if (sink->rtmp) {
191     RTMP_Free (sink->rtmp);
192     sink->rtmp = NULL;
193   }
194   g_free (sink->rtmp_uri);
195   sink->rtmp_uri = NULL;
196   return FALSE;
197 }
198 
199 static gboolean
gst_rtmp_sink_stop(GstBaseSink * basesink)200 gst_rtmp_sink_stop (GstBaseSink * basesink)
201 {
202   GstRTMPSink *sink = GST_RTMP_SINK (basesink);
203 
204   if (sink->header) {
205     gst_buffer_unref (sink->header);
206     sink->header = NULL;
207   }
208   if (sink->rtmp) {
209     RTMP_Close (sink->rtmp);
210     RTMP_Free (sink->rtmp);
211     sink->rtmp = NULL;
212   }
213   if (sink->rtmp_uri) {
214     g_free (sink->rtmp_uri);
215     sink->rtmp_uri = NULL;
216   }
217 
218   return TRUE;
219 }
220 
221 static GstFlowReturn
gst_rtmp_sink_render(GstBaseSink * bsink,GstBuffer * buf)222 gst_rtmp_sink_render (GstBaseSink * bsink, GstBuffer * buf)
223 {
224   GstRTMPSink *sink = GST_RTMP_SINK (bsink);
225   gboolean need_unref = FALSE;
226   GstMapInfo map = GST_MAP_INFO_INIT;
227 
228   if (sink->rtmp == NULL) {
229     /* Do not crash */
230     GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL), ("Failed to write data"));
231     return GST_FLOW_ERROR;
232   }
233 
234   /* Ignore buffers that are in the stream headers (caps) */
235   if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_HEADER)) {
236     return GST_FLOW_OK;
237   }
238 
239   if (sink->first) {
240     /* open the connection */
241     if (!RTMP_IsConnected (sink->rtmp)) {
242       if (!RTMP_Connect (sink->rtmp, NULL))
243         goto connection_failed;
244 
245       if (!RTMP_ConnectStream (sink->rtmp, 0)) {
246         RTMP_Close (sink->rtmp);
247         goto connection_failed;
248       }
249 
250       GST_DEBUG_OBJECT (sink, "Opened connection to %s", sink->rtmp_uri);
251     }
252 
253     /* Prepend the header from the caps to the first non header buffer */
254     if (sink->header) {
255       buf = gst_buffer_append (gst_buffer_ref (sink->header),
256           gst_buffer_ref (buf));
257       need_unref = TRUE;
258     }
259 
260     sink->first = FALSE;
261   }
262 
263   if (sink->have_write_error)
264     goto write_failed;
265 
266   GST_LOG_OBJECT (sink, "Sending %" G_GSIZE_FORMAT " bytes to RTMP server",
267       gst_buffer_get_size (buf));
268 
269   gst_buffer_map (buf, &map, GST_MAP_READ);
270 
271   if (RTMP_Write (sink->rtmp, (char *) map.data, map.size) <= 0)
272     goto write_failed;
273 
274   gst_buffer_unmap (buf, &map);
275   if (need_unref)
276     gst_buffer_unref (buf);
277 
278   return GST_FLOW_OK;
279 
280   /* ERRORS */
281 write_failed:
282   {
283     GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL), ("Failed to write data"));
284     gst_buffer_unmap (buf, &map);
285     if (need_unref)
286       gst_buffer_unref (buf);
287     sink->have_write_error = TRUE;
288     return GST_FLOW_ERROR;
289   }
290 
291 connection_failed:
292   {
293     GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_WRITE, (NULL),
294         ("Could not connect to RTMP stream \"%s\" for writing", sink->uri));
295     RTMP_Free (sink->rtmp);
296     sink->rtmp = NULL;
297     g_free (sink->rtmp_uri);
298     sink->rtmp_uri = NULL;
299     sink->have_write_error = TRUE;
300 
301     return GST_FLOW_ERROR;
302   }
303 }
304 
305 /*
306  * URI interface support.
307  */
308 static GstURIType
gst_rtmp_sink_uri_get_type(GType type)309 gst_rtmp_sink_uri_get_type (GType type)
310 {
311   return GST_URI_SINK;
312 }
313 
314 static const gchar *const *
gst_rtmp_sink_uri_get_protocols(GType type)315 gst_rtmp_sink_uri_get_protocols (GType type)
316 {
317   static const gchar *protocols[] =
318       { "rtmp", "rtmpt", "rtmps", "rtmpe", "rtmfp", "rtmpte", "rtmpts", NULL };
319 
320   return protocols;
321 }
322 
323 static gchar *
gst_rtmp_sink_uri_get_uri(GstURIHandler * handler)324 gst_rtmp_sink_uri_get_uri (GstURIHandler * handler)
325 {
326   GstRTMPSink *sink = GST_RTMP_SINK (handler);
327 
328   /* FIXME: make thread-safe */
329   return g_strdup (sink->uri);
330 }
331 
332 static gboolean
gst_rtmp_sink_uri_set_uri(GstURIHandler * handler,const gchar * uri,GError ** error)333 gst_rtmp_sink_uri_set_uri (GstURIHandler * handler, const gchar * uri,
334     GError ** error)
335 {
336   GstRTMPSink *sink = GST_RTMP_SINK (handler);
337   gboolean ret = TRUE;
338 
339   if (GST_STATE (sink) >= GST_STATE_PAUSED) {
340     g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_STATE,
341         "Changing the URI on rtmpsink when it is running is not supported");
342     return FALSE;
343   }
344 
345   g_free (sink->uri);
346   sink->uri = NULL;
347 
348   if (uri != NULL) {
349     int protocol;
350     AVal host;
351     unsigned int port;
352     AVal playpath, app;
353 
354     if (!RTMP_ParseURL (uri, &protocol, &host, &port, &playpath, &app) ||
355         !host.av_len) {
356       GST_ELEMENT_ERROR (sink, RESOURCE, OPEN_WRITE,
357           ("Failed to parse URI %s", uri), (NULL));
358       g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
359           "Could not parse RTMP URI");
360       ret = FALSE;
361     } else {
362       sink->uri = g_strdup (uri);
363     }
364 
365     if (playpath.av_val)
366       free (playpath.av_val);
367   }
368 
369   if (ret) {
370     sink->have_write_error = FALSE;
371     GST_DEBUG_OBJECT (sink, "Changed URI to %s", GST_STR_NULL (uri));
372   }
373 
374   return ret;
375 }
376 
377 static void
gst_rtmp_sink_uri_handler_init(gpointer g_iface,gpointer iface_data)378 gst_rtmp_sink_uri_handler_init (gpointer g_iface, gpointer iface_data)
379 {
380   GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
381 
382   iface->get_type = gst_rtmp_sink_uri_get_type;
383   iface->get_protocols = gst_rtmp_sink_uri_get_protocols;
384   iface->get_uri = gst_rtmp_sink_uri_get_uri;
385   iface->set_uri = gst_rtmp_sink_uri_set_uri;
386 }
387 
388 static void
gst_rtmp_sink_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)389 gst_rtmp_sink_set_property (GObject * object, guint prop_id,
390     const GValue * value, GParamSpec * pspec)
391 {
392   GstRTMPSink *sink = GST_RTMP_SINK (object);
393 
394   switch (prop_id) {
395     case PROP_LOCATION:
396       gst_rtmp_sink_uri_set_uri (GST_URI_HANDLER (sink),
397           g_value_get_string (value), NULL);
398       break;
399     default:
400       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
401       break;
402   }
403 }
404 
405 static gboolean
gst_rtmp_sink_setcaps(GstBaseSink * sink,GstCaps * caps)406 gst_rtmp_sink_setcaps (GstBaseSink * sink, GstCaps * caps)
407 {
408   GstRTMPSink *rtmpsink = GST_RTMP_SINK (sink);
409   GstStructure *s;
410   const GValue *sh;
411 
412   GST_DEBUG_OBJECT (sink, "caps set to %" GST_PTR_FORMAT, caps);
413 
414   /* Clear our current header buffer */
415   if (rtmpsink->header) {
416     gst_buffer_unref (rtmpsink->header);
417     rtmpsink->header = NULL;
418   }
419 
420   s = gst_caps_get_structure (caps, 0);
421 
422   sh = gst_structure_get_value (s, "streamheader");
423   if (sh == NULL)
424     goto out;
425 
426   if (GST_VALUE_HOLDS_BUFFER (sh)) {
427     rtmpsink->header = gst_buffer_ref (gst_value_get_buffer (sh));
428   } else if (GST_VALUE_HOLDS_ARRAY (sh)) {
429     GArray *buffers;
430     gint i;
431 
432     buffers = g_value_peek_pointer (sh);
433 
434     /* Concatenate all buffers in streamheader into one */
435     rtmpsink->header = gst_buffer_new ();
436     for (i = 0; i < buffers->len; ++i) {
437       GValue *val;
438       GstBuffer *buf;
439 
440       val = &g_array_index (buffers, GValue, i);
441       buf = g_value_peek_pointer (val);
442 
443       gst_buffer_ref (buf);
444 
445       rtmpsink->header = gst_buffer_append (rtmpsink->header, buf);
446     }
447   } else {
448     GST_ERROR_OBJECT (rtmpsink, "streamheader field has unexpected type %s",
449         G_VALUE_TYPE_NAME (sh));
450   }
451 
452   GST_DEBUG_OBJECT (rtmpsink, "have %" G_GSIZE_FORMAT " bytes of header data",
453       gst_buffer_get_size (rtmpsink->header));
454 
455 out:
456 
457   return TRUE;
458 }
459 
460 static gboolean
gst_rtmp_sink_event(GstBaseSink * sink,GstEvent * event)461 gst_rtmp_sink_event (GstBaseSink * sink, GstEvent * event)
462 {
463   GstRTMPSink *rtmpsink = GST_RTMP_SINK (sink);
464 
465   switch (event->type) {
466     case GST_EVENT_FLUSH_STOP:
467       rtmpsink->have_write_error = FALSE;
468       break;
469     default:
470       break;
471   }
472 
473   return GST_BASE_SINK_CLASS (parent_class)->event (sink, event);
474 }
475 
476 static void
gst_rtmp_sink_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)477 gst_rtmp_sink_get_property (GObject * object, guint prop_id,
478     GValue * value, GParamSpec * pspec)
479 {
480   GstRTMPSink *sink = GST_RTMP_SINK (object);
481 
482   switch (prop_id) {
483     case PROP_LOCATION:
484       g_value_set_string (value, sink->uri);
485       break;
486     default:
487       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
488       break;
489   }
490 }
491