1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wtay@chello.be>
4  *                    2002 Kristian Rietveld <kris@gtk.org>
5  *                    2002,2003 Colin Walters <walters@gnu.org>
6  *                    2001,2010 Bastien Nocera <hadess@hadess.net>
7  *                    2010 Sebastian Dröge <sebastian.droege@collabora.co.uk>
8  *
9  * rtmpsrc.c:
10  *
11  * This library is free software; you can redistribute it and/or
12  * modify it under the terms of the GNU Library General Public
13  * License as published by the Free Software Foundation; either
14  * version 2 of the License, or (at your option) any later version.
15  *
16  * This library is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19  * Library General Public License for more details.
20  *
21  * You should have received a copy of the GNU Library General Public
22  * License along with this library; if not, write to the
23  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
24  * Boston, MA 02110-1301, USA.
25  */
26 
27 /**
28  * SECTION:element-rtmpsrc
29  * @title: rtmpsrc
30  *
31  * This plugin reads data from a local or remote location specified
32  * by an URI. This location can be specified using any protocol supported by
33  * the RTMP library, i.e. rtmp, rtmpt, rtmps, rtmpe, rtmfp, rtmpte and rtmpts.
34  *
35  * ## Example launch lines
36  * |[
37  * gst-launch-1.0 -v rtmpsrc location=rtmp://somehost/someurl ! fakesink
38  * ]| Open an RTMP location and pass its content to fakesink.
39  *
40  */
41 
42 #ifdef HAVE_CONFIG_H
43 #include "config.h"
44 #endif
45 
46 #include <glib/gi18n-lib.h>
47 
48 #include "gstrtmpsrc.h"
49 
50 #include <stdio.h>
51 #include <stdlib.h>
52 #include <string.h>
53 
54 #include <gst/gst.h>
55 
56 #ifdef G_OS_WIN32
57 #include <winsock2.h>
58 #endif
59 
60 GST_DEBUG_CATEGORY_STATIC (rtmpsrc_debug);
61 #define GST_CAT_DEFAULT rtmpsrc_debug
62 
63 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
64     GST_PAD_SRC,
65     GST_PAD_ALWAYS,
66     GST_STATIC_CAPS_ANY);
67 
68 enum
69 {
70   PROP_0,
71   PROP_LOCATION,
72   PROP_TIMEOUT
73 #if 0
74       PROP_SWF_URL,
75   PROP_PAGE_URL
76 #endif
77 };
78 
79 #define DEFAULT_LOCATION NULL
80 #define DEFAULT_TIMEOUT 120
81 
82 static void gst_rtmp_src_uri_handler_init (gpointer g_iface,
83     gpointer iface_data);
84 
85 static void gst_rtmp_src_set_property (GObject * object, guint prop_id,
86     const GValue * value, GParamSpec * pspec);
87 static void gst_rtmp_src_get_property (GObject * object, guint prop_id,
88     GValue * value, GParamSpec * pspec);
89 static void gst_rtmp_src_finalize (GObject * object);
90 
91 static gboolean gst_rtmp_src_connect (GstRTMPSrc * src);
92 static gboolean gst_rtmp_src_unlock (GstBaseSrc * src);
93 static gboolean gst_rtmp_src_stop (GstBaseSrc * src);
94 static gboolean gst_rtmp_src_start (GstBaseSrc * src);
95 static gboolean gst_rtmp_src_is_seekable (GstBaseSrc * src);
96 static gboolean gst_rtmp_src_prepare_seek_segment (GstBaseSrc * src,
97     GstEvent * event, GstSegment * segment);
98 static gboolean gst_rtmp_src_do_seek (GstBaseSrc * src, GstSegment * segment);
99 static GstFlowReturn gst_rtmp_src_create (GstPushSrc * pushsrc,
100     GstBuffer ** buffer);
101 static gboolean gst_rtmp_src_query (GstBaseSrc * src, GstQuery * query);
102 
103 #define gst_rtmp_src_parent_class parent_class
104 G_DEFINE_TYPE_WITH_CODE (GstRTMPSrc, gst_rtmp_src, GST_TYPE_PUSH_SRC,
105     G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER,
106         gst_rtmp_src_uri_handler_init));
107 
108 static void
gst_rtmp_src_class_init(GstRTMPSrcClass * klass)109 gst_rtmp_src_class_init (GstRTMPSrcClass * klass)
110 {
111   GObjectClass *gobject_class;
112   GstElementClass *gstelement_class;
113   GstBaseSrcClass *gstbasesrc_class;
114   GstPushSrcClass *gstpushsrc_class;
115 
116   gobject_class = G_OBJECT_CLASS (klass);
117   gstelement_class = GST_ELEMENT_CLASS (klass);
118   gstbasesrc_class = GST_BASE_SRC_CLASS (klass);
119   gstpushsrc_class = GST_PUSH_SRC_CLASS (klass);
120 
121   gobject_class->finalize = gst_rtmp_src_finalize;
122   gobject_class->set_property = gst_rtmp_src_set_property;
123   gobject_class->get_property = gst_rtmp_src_get_property;
124 
125   /* properties */
126   g_object_class_install_property (gobject_class, PROP_LOCATION,
127       g_param_spec_string ("location", "RTMP Location",
128           "Location of the RTMP url to read",
129           DEFAULT_LOCATION, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
130 
131   g_object_class_install_property (gobject_class, PROP_TIMEOUT,
132       g_param_spec_int ("timeout", "RTMP Timeout",
133           "Time without receiving any data from the server to wait before to timeout the session",
134           0, G_MAXINT,
135           DEFAULT_TIMEOUT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
136 
137   gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
138 
139   gst_element_class_set_static_metadata (gstelement_class,
140       "RTMP Source",
141       "Source/File",
142       "Read RTMP streams",
143       "Bastien Nocera <hadess@hadess.net>, "
144       "Sebastian Dröge <sebastian.droege@collabora.co.uk>");
145 
146   gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_rtmp_src_start);
147   gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_rtmp_src_stop);
148   gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_rtmp_src_unlock);
149   gstbasesrc_class->is_seekable = GST_DEBUG_FUNCPTR (gst_rtmp_src_is_seekable);
150   gstbasesrc_class->prepare_seek_segment =
151       GST_DEBUG_FUNCPTR (gst_rtmp_src_prepare_seek_segment);
152   gstbasesrc_class->do_seek = GST_DEBUG_FUNCPTR (gst_rtmp_src_do_seek);
153   gstpushsrc_class->create = GST_DEBUG_FUNCPTR (gst_rtmp_src_create);
154   gstbasesrc_class->query = GST_DEBUG_FUNCPTR (gst_rtmp_src_query);
155 
156   GST_DEBUG_CATEGORY_INIT (rtmpsrc_debug, "rtmpsrc", 0, "RTMP Source");
157 }
158 
159 static void
gst_rtmp_src_init(GstRTMPSrc * rtmpsrc)160 gst_rtmp_src_init (GstRTMPSrc * rtmpsrc)
161 {
162 #ifdef G_OS_WIN32
163   WSADATA wsa_data;
164 
165   if (WSAStartup (MAKEWORD (2, 2), &wsa_data) != 0) {
166     GST_ERROR_OBJECT (rtmpsrc, "WSAStartup failed: 0x%08x", WSAGetLastError ());
167   }
168 #endif
169 
170   rtmpsrc->cur_offset = 0;
171   rtmpsrc->last_timestamp = 0;
172   rtmpsrc->timeout = DEFAULT_TIMEOUT;
173 
174   gst_base_src_set_format (GST_BASE_SRC (rtmpsrc), GST_FORMAT_TIME);
175 }
176 
177 static void
gst_rtmp_src_finalize(GObject * object)178 gst_rtmp_src_finalize (GObject * object)
179 {
180   GstRTMPSrc *rtmpsrc = GST_RTMP_SRC (object);
181 
182   g_free (rtmpsrc->uri);
183   rtmpsrc->uri = NULL;
184 
185 #ifdef G_OS_WIN32
186   WSACleanup ();
187 #endif
188 
189   G_OBJECT_CLASS (parent_class)->finalize (object);
190 }
191 
192 /*
193  * URI interface support.
194  */
195 
196 static GstURIType
gst_rtmp_src_uri_get_type(GType type)197 gst_rtmp_src_uri_get_type (GType type)
198 {
199   return GST_URI_SRC;
200 }
201 
202 static const gchar *const *
gst_rtmp_src_uri_get_protocols(GType type)203 gst_rtmp_src_uri_get_protocols (GType type)
204 {
205   static const gchar *protocols[] =
206       { "rtmp", "rtmpt", "rtmps", "rtmpe", "rtmfp", "rtmpte", "rtmpts", NULL };
207 
208   return protocols;
209 }
210 
211 static gchar *
gst_rtmp_src_uri_get_uri(GstURIHandler * handler)212 gst_rtmp_src_uri_get_uri (GstURIHandler * handler)
213 {
214   GstRTMPSrc *src = GST_RTMP_SRC (handler);
215 
216   /* FIXME: make thread-safe */
217   return g_strdup (src->uri);
218 }
219 
220 static gboolean
gst_rtmp_src_uri_set_uri(GstURIHandler * handler,const gchar * uri,GError ** error)221 gst_rtmp_src_uri_set_uri (GstURIHandler * handler, const gchar * uri,
222     GError ** error)
223 {
224   GstRTMPSrc *src = GST_RTMP_SRC (handler);
225 
226   if (GST_STATE (src) >= GST_STATE_PAUSED) {
227     g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_STATE,
228         "Changing the URI on rtmpsrc when it is running is not supported");
229     return FALSE;
230   }
231 
232   g_free (src->uri);
233   src->uri = NULL;
234 
235   if (uri != NULL) {
236     int protocol;
237     AVal host;
238     unsigned int port;
239     AVal playpath, app;
240 
241     if (!RTMP_ParseURL (uri, &protocol, &host, &port, &playpath, &app) ||
242         !host.av_len || !playpath.av_len) {
243       GST_ERROR_OBJECT (src, "Failed to parse URI %s", uri);
244       g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
245           "Could not parse RTMP URI");
246 #ifndef _MSC_VER
247       /* FIXME: we should not be freeing RTMP internals to avoid leaking */
248       free (playpath.av_val);
249 #endif
250       return FALSE;
251     }
252 #ifndef _MSC_VER
253     free (playpath.av_val);
254 #endif
255     src->uri = g_strdup (uri);
256   }
257 
258   GST_DEBUG_OBJECT (src, "Changed URI to %s", GST_STR_NULL (uri));
259 
260   return TRUE;
261 }
262 
263 static void
gst_rtmp_src_uri_handler_init(gpointer g_iface,gpointer iface_data)264 gst_rtmp_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
265 {
266   GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
267 
268   iface->get_type = gst_rtmp_src_uri_get_type;
269   iface->get_protocols = gst_rtmp_src_uri_get_protocols;
270   iface->get_uri = gst_rtmp_src_uri_get_uri;
271   iface->set_uri = gst_rtmp_src_uri_set_uri;
272 }
273 
274 static void
gst_rtmp_src_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)275 gst_rtmp_src_set_property (GObject * object, guint prop_id,
276     const GValue * value, GParamSpec * pspec)
277 {
278   GstRTMPSrc *src;
279 
280   src = GST_RTMP_SRC (object);
281 
282   switch (prop_id) {
283     case PROP_LOCATION:{
284       gst_rtmp_src_uri_set_uri (GST_URI_HANDLER (src),
285           g_value_get_string (value), NULL);
286       break;
287     }
288     case PROP_TIMEOUT:{
289       src->timeout = g_value_get_int (value);
290       break;
291     }
292     default:
293       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
294       break;
295   }
296 }
297 
298 static void
gst_rtmp_src_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)299 gst_rtmp_src_get_property (GObject * object, guint prop_id, GValue * value,
300     GParamSpec * pspec)
301 {
302   GstRTMPSrc *src;
303 
304   src = GST_RTMP_SRC (object);
305 
306   switch (prop_id) {
307     case PROP_LOCATION:
308       g_value_set_string (value, src->uri);
309       break;
310     case PROP_TIMEOUT:
311       g_value_set_int (value, src->timeout);
312       break;
313     default:
314       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
315       break;
316   }
317 }
318 
319 /*
320  * Read a new buffer from src->reqoffset, takes care of events
321  * and seeking and such.
322  */
323 static GstFlowReturn
gst_rtmp_src_create(GstPushSrc * pushsrc,GstBuffer ** buffer)324 gst_rtmp_src_create (GstPushSrc * pushsrc, GstBuffer ** buffer)
325 {
326   GstRTMPSrc *src;
327   GstBuffer *buf;
328   GstMapInfo map;
329   guint8 *data;
330   guint todo;
331   gsize bsize;
332   int size;
333 
334   src = GST_RTMP_SRC (pushsrc);
335 
336   g_return_val_if_fail (src->rtmp != NULL, GST_FLOW_ERROR);
337 
338   if (!RTMP_IsConnected (src->rtmp)) {
339     GST_DEBUG_OBJECT (src, "reconnecting");
340     if (!gst_rtmp_src_connect (src))
341       return GST_FLOW_ERROR;
342   }
343 
344   size = GST_BASE_SRC_CAST (pushsrc)->blocksize;
345 
346   GST_DEBUG ("reading from %" G_GUINT64_FORMAT
347       ", size %u", src->cur_offset, size);
348 
349   buf = gst_buffer_new_allocate (NULL, size, NULL);
350   if (G_UNLIKELY (buf == NULL)) {
351     GST_ERROR_OBJECT (src, "Failed to allocate %u bytes", size);
352     return GST_FLOW_ERROR;
353   }
354 
355   todo = size;
356   gst_buffer_map (buf, &map, GST_MAP_WRITE);
357   data = map.data;
358   bsize = 0;
359 
360   while (todo > 0) {
361     int read = RTMP_Read (src->rtmp, (char *) data, todo);
362 
363     if (G_UNLIKELY (read == 0 && todo == size))
364       goto eos;
365 
366     if (G_UNLIKELY (read == 0))
367       break;
368 
369     if (G_UNLIKELY (read < 0))
370       goto read_failed;
371 
372     if (read < todo) {
373       data += read;
374       todo -= read;
375       bsize += read;
376     } else {
377       bsize += todo;
378       todo = 0;
379     }
380     GST_LOG ("  got size %d", read);
381   }
382   gst_buffer_unmap (buf, &map);
383   gst_buffer_resize (buf, 0, bsize);
384 
385   if (src->discont) {
386     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
387     src->discont = FALSE;
388   }
389 
390   GST_BUFFER_TIMESTAMP (buf) = src->last_timestamp;
391   GST_BUFFER_OFFSET (buf) = src->cur_offset;
392 
393   src->cur_offset += size;
394   if (src->last_timestamp == GST_CLOCK_TIME_NONE)
395     src->last_timestamp = src->rtmp->m_mediaStamp * GST_MSECOND;
396   else
397     src->last_timestamp =
398         MAX (src->last_timestamp, src->rtmp->m_mediaStamp * GST_MSECOND);
399 
400   GST_LOG_OBJECT (src, "Created buffer of size %u at %" G_GINT64_FORMAT
401       " with timestamp %" GST_TIME_FORMAT, size, GST_BUFFER_OFFSET (buf),
402       GST_TIME_ARGS (GST_BUFFER_TIMESTAMP (buf)));
403 
404 
405   /* we're done, return the buffer */
406   *buffer = buf;
407 
408   return GST_FLOW_OK;
409 
410 read_failed:
411   {
412     gst_buffer_unmap (buf, &map);
413     gst_buffer_unref (buf);
414     GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL), ("Failed to read data"));
415     return GST_FLOW_ERROR;
416   }
417 eos:
418   {
419     gst_buffer_unmap (buf, &map);
420     gst_buffer_unref (buf);
421     if (src->cur_offset == 0) {
422       GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
423           ("Failed to read any data from stream, check your URL"));
424       return GST_FLOW_ERROR;
425     } else {
426       GST_DEBUG_OBJECT (src, "Reading data gave EOS");
427       return GST_FLOW_EOS;
428     }
429   }
430 }
431 
432 static gboolean
gst_rtmp_src_query(GstBaseSrc * basesrc,GstQuery * query)433 gst_rtmp_src_query (GstBaseSrc * basesrc, GstQuery * query)
434 {
435   gboolean ret = FALSE;
436   GstRTMPSrc *src = GST_RTMP_SRC (basesrc);
437 
438   switch (GST_QUERY_TYPE (query)) {
439     case GST_QUERY_URI:
440       gst_query_set_uri (query, src->uri);
441       ret = TRUE;
442       break;
443     case GST_QUERY_POSITION:{
444       GstFormat format;
445 
446       gst_query_parse_position (query, &format, NULL);
447       if (format == GST_FORMAT_TIME) {
448         gst_query_set_position (query, format, src->last_timestamp);
449         ret = TRUE;
450       }
451       break;
452     }
453     case GST_QUERY_DURATION:{
454       GstFormat format;
455       gdouble duration;
456 
457       gst_query_parse_duration (query, &format, NULL);
458       if (format == GST_FORMAT_TIME && src->rtmp) {
459         duration = RTMP_GetDuration (src->rtmp);
460         if (duration != 0.0) {
461           gst_query_set_duration (query, format, duration * GST_SECOND);
462           ret = TRUE;
463         }
464       }
465       break;
466     }
467     case GST_QUERY_SCHEDULING:{
468       gst_query_set_scheduling (query,
469           GST_SCHEDULING_FLAG_SEQUENTIAL |
470           GST_SCHEDULING_FLAG_BANDWIDTH_LIMITED, 1, -1, 0);
471       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
472 
473       ret = TRUE;
474       break;
475     }
476     default:
477       ret = FALSE;
478       break;
479   }
480 
481   if (!ret)
482     ret = GST_BASE_SRC_CLASS (parent_class)->query (basesrc, query);
483 
484   return ret;
485 }
486 
487 static gboolean
gst_rtmp_src_is_seekable(GstBaseSrc * basesrc)488 gst_rtmp_src_is_seekable (GstBaseSrc * basesrc)
489 {
490   GstRTMPSrc *src;
491 
492   src = GST_RTMP_SRC (basesrc);
493 
494   return src->seekable;
495 }
496 
497 static gboolean
gst_rtmp_src_prepare_seek_segment(GstBaseSrc * basesrc,GstEvent * event,GstSegment * segment)498 gst_rtmp_src_prepare_seek_segment (GstBaseSrc * basesrc, GstEvent * event,
499     GstSegment * segment)
500 {
501   GstRTMPSrc *src;
502   GstSeekType cur_type, stop_type;
503   gint64 cur, stop;
504   GstSeekFlags flags;
505   GstFormat format;
506   gdouble rate;
507 
508   src = GST_RTMP_SRC (basesrc);
509 
510   gst_event_parse_seek (event, &rate, &format, &flags,
511       &cur_type, &cur, &stop_type, &stop);
512 
513   if (!src->seekable) {
514     GST_LOG_OBJECT (src, "Not a seekable stream");
515     return FALSE;
516   }
517 
518   if (!src->rtmp) {
519     GST_LOG_OBJECT (src, "Not connected yet");
520     return FALSE;
521   }
522 
523   if (format != GST_FORMAT_TIME) {
524     GST_LOG_OBJECT (src, "Seeking only supported in TIME format");
525     return FALSE;
526   }
527 
528   if (stop_type != GST_SEEK_TYPE_NONE) {
529     GST_LOG_OBJECT (src, "Setting a stop position is not supported");
530     return FALSE;
531   }
532 
533   gst_segment_init (segment, GST_FORMAT_TIME);
534   gst_segment_do_seek (segment, rate, format, flags, cur_type, cur, stop_type,
535       stop, NULL);
536 
537   return TRUE;
538 }
539 
540 static gboolean
gst_rtmp_src_do_seek(GstBaseSrc * basesrc,GstSegment * segment)541 gst_rtmp_src_do_seek (GstBaseSrc * basesrc, GstSegment * segment)
542 {
543   GstRTMPSrc *src;
544 
545   src = GST_RTMP_SRC (basesrc);
546 
547   if (segment->format != GST_FORMAT_TIME) {
548     GST_LOG_OBJECT (src, "Only time based seeks are supported");
549     return FALSE;
550   }
551 
552   if (!src->rtmp) {
553     GST_LOG_OBJECT (src, "Not connected yet");
554     return FALSE;
555   }
556 
557   /* Initial seek */
558   if (src->cur_offset == 0 && segment->start == 0)
559     goto success;
560 
561   if (!src->seekable) {
562     GST_LOG_OBJECT (src, "Not a seekable stream");
563     return FALSE;
564   }
565 
566   /* If we have just disconnected in unlock(), we need to re-connect
567    * and also let librtmp read some data before sending a seek,
568    * otherwise it will stall. Calling create() does both. */
569   if (!RTMP_IsConnected (src->rtmp)) {
570     GstBuffer *buffer = NULL;
571     gst_rtmp_src_create (GST_PUSH_SRC (basesrc), &buffer);
572     gst_buffer_replace (&buffer, NULL);
573   }
574 
575   src->last_timestamp = GST_CLOCK_TIME_NONE;
576   if (!RTMP_SendSeek (src->rtmp, segment->start / GST_MSECOND)) {
577     GST_ERROR_OBJECT (src, "Seeking failed");
578     src->seekable = FALSE;
579     return FALSE;
580   }
581 
582 success:
583   /* This is set here so that the call to create() above doesn't clear it */
584   src->discont = TRUE;
585 
586   GST_DEBUG_OBJECT (src, "Seek to %" GST_TIME_FORMAT " successfull",
587       GST_TIME_ARGS (segment->start));
588 
589   return TRUE;
590 }
591 
592 static gboolean
gst_rtmp_src_connect(GstRTMPSrc * src)593 gst_rtmp_src_connect (GstRTMPSrc * src)
594 {
595   RTMP_Init (src->rtmp);
596   src->rtmp->Link.timeout = src->timeout;
597   if (!RTMP_SetupURL (src->rtmp, src->uri)) {
598     GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
599         ("Failed to setup URL '%s'", src->uri));
600     return FALSE;
601   }
602   src->seekable = !(src->rtmp->Link.lFlags & RTMP_LF_LIVE);
603   GST_INFO_OBJECT (src, "seekable %d", src->seekable);
604 
605   /* open if required */
606   if (!RTMP_IsConnected (src->rtmp)) {
607     if (!RTMP_Connect (src->rtmp, NULL)) {
608       GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
609           ("Could not connect to RTMP stream \"%s\" for reading", src->uri));
610       return FALSE;
611     }
612   }
613 
614   return TRUE;
615 }
616 
617 /* open the file, do stuff necessary to go to PAUSED state */
618 static gboolean
gst_rtmp_src_start(GstBaseSrc * basesrc)619 gst_rtmp_src_start (GstBaseSrc * basesrc)
620 {
621   GstRTMPSrc *src;
622 
623   src = GST_RTMP_SRC (basesrc);
624 
625   if (!src->uri) {
626     GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL), ("No filename given"));
627     return FALSE;
628   }
629 
630   src->cur_offset = 0;
631   src->last_timestamp = 0;
632   src->discont = TRUE;
633 
634   src->rtmp = RTMP_Alloc ();
635   if (!src->rtmp) {
636     GST_ERROR_OBJECT (src, "Could not allocate librtmp's RTMP context");
637     goto error;
638   }
639 
640   if (!gst_rtmp_src_connect (src))
641     goto error;
642 
643   return TRUE;
644 
645 error:
646   if (src->rtmp) {
647     RTMP_Free (src->rtmp);
648     src->rtmp = NULL;
649   }
650   return FALSE;
651 }
652 
653 static gboolean
gst_rtmp_src_unlock(GstBaseSrc * basesrc)654 gst_rtmp_src_unlock (GstBaseSrc * basesrc)
655 {
656   GstRTMPSrc *rtmpsrc = GST_RTMP_SRC (basesrc);
657 
658   GST_DEBUG_OBJECT (rtmpsrc, "unlock");
659 
660   /* This closes the socket, which means that any pending socket calls
661    * error out. */
662   if (rtmpsrc->rtmp) {
663     RTMP_Close (rtmpsrc->rtmp);
664   }
665 
666   return TRUE;
667 }
668 
669 
670 static gboolean
gst_rtmp_src_stop(GstBaseSrc * basesrc)671 gst_rtmp_src_stop (GstBaseSrc * basesrc)
672 {
673   GstRTMPSrc *src;
674 
675   src = GST_RTMP_SRC (basesrc);
676 
677   if (src->rtmp) {
678     RTMP_Free (src->rtmp);
679     src->rtmp = NULL;
680   }
681 
682   src->cur_offset = 0;
683   src->last_timestamp = 0;
684   src->discont = TRUE;
685 
686   return TRUE;
687 }
688