1 /* GStreamer
2  * Copyright (C) 2013 Collabora Ltd.
3  *   @author Torrie Fischer <torrie.fischer@collabora.co.uk>
4  *
5  * This library is free software; you can redistribute it and/or
6  * modify it under the terms of the GNU Library General Public
7  * License as published by the Free Software Foundation; either
8  * version 2 of the License, or (at your option) any later version.
9  *
10  * This library is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * Library General Public License for more details.
14  *
15  * You should have received a copy of the GNU Library General Public
16  * License along with this library; if not, write to the
17  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
18  * Boston, MA 02110-1301, USA.
19  */
20 #include <gst/gst.h>
21 #include <gst/rtp/rtp.h>
22 #include <stdlib.h>
23 
24 /*
25  * RTP receiver with RFC4588 retransmission handling enabled
26  *
27  *  In this example we have two RTP sessions, one for video and one for audio.
28  *  Video is received on port 5000, with its RTCP stream received on port 5001
29  *  and sent on port 5005. Audio is received on port 5005, with its RTCP stream
30  *  received on port 5006 and sent on port 5011.
31  *
32  *  In both sessions, we set "rtprtxreceive" as the session's "aux" element
33  *  in rtpbin, which enables RFC4588 retransmission handling for that session.
34  *
35  *             .-------.      .----------.        .-----------.   .---------.   .-------------.
36  *  RTP        |udpsrc |      | rtpbin   |        |theoradepay|   |theoradec|   |autovideosink|
37  *  port=5000  |      src->recv_rtp_0 recv_rtp_0->sink       src->sink     src->sink          |
38  *             '-------'      |          |        '-----------'   '---------'   '-------------'
39  *                            |          |
40  *                            |          |     .-------.
41  *                            |          |     |udpsink|  RTCP
42  *                            |  send_rtcp_0->sink     | port=5005
43  *             .-------.      |          |     '-------' sync=false
44  *  RTCP       |udpsrc |      |          |               async=false
45  *  port=5001  |     src->recv_rtcp_0    |
46  *             '-------'      |          |
47  *                            |          |
48  *             .-------.      |          |        .---------.   .-------.   .-------------.
49  *  RTP        |udpsrc |      |          |        |pcmadepay|   |alawdec|   |autoaudiosink|
50  *  port=5006  |      src->recv_rtp_1 recv_rtp_1->sink     src->sink   src->sink          |
51  *             '-------'      |          |        '---------'   '-------'   '-------------'
52  *                            |          |
53  *                            |          |     .-------.
54  *                            |          |     |udpsink|  RTCP
55  *                            |  send_rtcp_1->sink     | port=5011
56  *             .-------.      |          |     '-------' sync=false
57  *  RTCP       |udpsrc |      |          |               async=false
58  *  port=5007  |     src->recv_rtcp_1    |
59  *             '-------'      '----------'
60  *
61  */
62 
63 GMainLoop *loop = NULL;
64 
65 typedef struct _SessionData
66 {
67   int ref;
68   GstElement *rtpbin;
69   guint sessionNum;
70   GstCaps *caps;
71   GstElement *output;
72 } SessionData;
73 
74 static SessionData *
session_ref(SessionData * data)75 session_ref (SessionData * data)
76 {
77   g_atomic_int_inc (&data->ref);
78   return data;
79 }
80 
81 static void
session_unref(gpointer data)82 session_unref (gpointer data)
83 {
84   SessionData *session = (SessionData *) data;
85   if (g_atomic_int_dec_and_test (&session->ref)) {
86     g_object_unref (session->rtpbin);
87     gst_caps_unref (session->caps);
88     g_free (session);
89   }
90 }
91 
92 static SessionData *
session_new(guint sessionNum)93 session_new (guint sessionNum)
94 {
95   SessionData *ret = g_new0 (SessionData, 1);
96   ret->sessionNum = sessionNum;
97   return session_ref (ret);
98 }
99 
100 static void
setup_ghost_sink(GstElement * sink,GstBin * bin)101 setup_ghost_sink (GstElement * sink, GstBin * bin)
102 {
103   GstPad *sinkPad = gst_element_get_static_pad (sink, "sink");
104   GstPad *binPad = gst_ghost_pad_new ("sink", sinkPad);
105   gst_element_add_pad (GST_ELEMENT (bin), binPad);
106 }
107 
108 static SessionData *
make_audio_session(guint sessionNum)109 make_audio_session (guint sessionNum)
110 {
111   SessionData *ret = session_new (sessionNum);
112   GstBin *bin = GST_BIN (gst_bin_new ("audio"));
113   GstElement *queue = gst_element_factory_make ("queue", NULL);
114   GstElement *sink = gst_element_factory_make ("autoaudiosink", NULL);
115   GstElement *audioconvert = gst_element_factory_make ("audioconvert", NULL);
116   GstElement *audioresample = gst_element_factory_make ("audioresample", NULL);
117   GstElement *depayloader = gst_element_factory_make ("rtppcmadepay", NULL);
118   GstElement *decoder = gst_element_factory_make ("alawdec", NULL);
119 
120   gst_bin_add_many (bin, queue, depayloader, decoder, audioconvert,
121       audioresample, sink, NULL);
122   gst_element_link_many (queue, depayloader, decoder, audioconvert,
123       audioresample, sink, NULL);
124 
125   setup_ghost_sink (queue, bin);
126 
127   ret->output = GST_ELEMENT (bin);
128   ret->caps = gst_caps_new_simple ("application/x-rtp",
129       "media", G_TYPE_STRING, "audio",
130       "clock-rate", G_TYPE_INT, 8000,
131       "encoding-name", G_TYPE_STRING, "PCMA", NULL);
132 
133   return ret;
134 }
135 
136 static SessionData *
make_video_session(guint sessionNum)137 make_video_session (guint sessionNum)
138 {
139   SessionData *ret = session_new (sessionNum);
140   GstBin *bin = GST_BIN (gst_bin_new ("video"));
141   GstElement *queue = gst_element_factory_make ("queue", NULL);
142   GstElement *depayloader = gst_element_factory_make ("rtptheoradepay", NULL);
143   GstElement *decoder = gst_element_factory_make ("theoradec", NULL);
144   GstElement *converter = gst_element_factory_make ("videoconvert", NULL);
145   GstElement *sink = gst_element_factory_make ("autovideosink", NULL);
146 
147   gst_bin_add_many (bin, depayloader, decoder, converter, queue, sink, NULL);
148   gst_element_link_many (queue, depayloader, decoder, converter, sink, NULL);
149 
150   setup_ghost_sink (queue, bin);
151 
152   ret->output = GST_ELEMENT (bin);
153   ret->caps = gst_caps_new_simple ("application/x-rtp",
154       "media", G_TYPE_STRING, "video",
155       "clock-rate", G_TYPE_INT, 90000,
156       "encoding-name", G_TYPE_STRING, "THEORA", NULL);
157 
158   return ret;
159 }
160 
161 static GstCaps *
request_pt_map(GstElement * rtpbin,guint session,guint pt,gpointer user_data)162 request_pt_map (GstElement * rtpbin, guint session, guint pt,
163     gpointer user_data)
164 {
165   SessionData *data = (SessionData *) user_data;
166   gchar *caps_str;
167   g_print ("Looking for caps for pt %u in session %u, have %u\n", pt, session,
168       data->sessionNum);
169   if (session == data->sessionNum) {
170     caps_str = gst_caps_to_string (data->caps);
171     g_print ("Returning %s\n", caps_str);
172     g_free (caps_str);
173     return gst_caps_ref (data->caps);
174   }
175   return NULL;
176 }
177 
178 static void
cb_eos(GstBus * bus,GstMessage * message,gpointer data)179 cb_eos (GstBus * bus, GstMessage * message, gpointer data)
180 {
181   g_print ("Got EOS\n");
182   g_main_loop_quit (loop);
183 }
184 
185 static void
cb_state(GstBus * bus,GstMessage * message,gpointer data)186 cb_state (GstBus * bus, GstMessage * message, gpointer data)
187 {
188   GstObject *pipe = GST_OBJECT (data);
189   GstState old, new, pending;
190   gst_message_parse_state_changed (message, &old, &new, &pending);
191   if (message->src == pipe) {
192     g_print ("Pipeline %s changed state from %s to %s\n",
193         GST_OBJECT_NAME (message->src),
194         gst_element_state_get_name (old), gst_element_state_get_name (new));
195   }
196 }
197 
198 static void
cb_warning(GstBus * bus,GstMessage * message,gpointer data)199 cb_warning (GstBus * bus, GstMessage * message, gpointer data)
200 {
201   GError *error = NULL;
202   gst_message_parse_warning (message, &error, NULL);
203   g_printerr ("Got warning from %s: %s\n", GST_OBJECT_NAME (message->src),
204       error->message);
205   g_error_free (error);
206 }
207 
208 static void
cb_error(GstBus * bus,GstMessage * message,gpointer data)209 cb_error (GstBus * bus, GstMessage * message, gpointer data)
210 {
211   GError *error = NULL;
212   gst_message_parse_error (message, &error, NULL);
213   g_printerr ("Got error from %s: %s\n", GST_OBJECT_NAME (message->src),
214       error->message);
215   g_error_free (error);
216   g_main_loop_quit (loop);
217 }
218 
219 static void
handle_new_stream(GstElement * element,GstPad * newPad,gpointer data)220 handle_new_stream (GstElement * element, GstPad * newPad, gpointer data)
221 {
222   SessionData *session = (SessionData *) data;
223   gchar *padName;
224   gchar *myPrefix;
225 
226   padName = gst_pad_get_name (newPad);
227   myPrefix = g_strdup_printf ("recv_rtp_src_%u", session->sessionNum);
228 
229   g_print ("New pad: %s, looking for %s_*\n", padName, myPrefix);
230 
231   if (g_str_has_prefix (padName, myPrefix)) {
232     GstPad *outputSinkPad;
233     GstElement *parent;
234 
235     parent = GST_ELEMENT (gst_element_get_parent (session->rtpbin));
236     gst_bin_add (GST_BIN (parent), session->output);
237     gst_element_sync_state_with_parent (session->output);
238     gst_object_unref (parent);
239 
240     outputSinkPad = gst_element_get_static_pad (session->output, "sink");
241     g_assert_cmpint (gst_pad_link (newPad, outputSinkPad), ==, GST_PAD_LINK_OK);
242     gst_object_unref (outputSinkPad);
243 
244     g_print ("Linked!\n");
245   }
246   g_free (myPrefix);
247   g_free (padName);
248 }
249 
250 static GstElement *
request_aux_receiver(GstElement * rtpbin,guint sessid,SessionData * session)251 request_aux_receiver (GstElement * rtpbin, guint sessid, SessionData * session)
252 {
253   GstElement *rtx, *bin;
254   GstPad *pad;
255   gchar *name;
256   GstStructure *pt_map;
257 
258   GST_INFO ("creating AUX receiver");
259   bin = gst_bin_new (NULL);
260   rtx = gst_element_factory_make ("rtprtxreceive", NULL);
261   pt_map = gst_structure_new ("application/x-rtp-pt-map",
262       "8", G_TYPE_UINT, 98, "96", G_TYPE_UINT, 99, NULL);
263   g_object_set (rtx, "payload-type-map", pt_map, NULL);
264   gst_structure_free (pt_map);
265   gst_bin_add (GST_BIN (bin), rtx);
266 
267   pad = gst_element_get_static_pad (rtx, "src");
268   name = g_strdup_printf ("src_%u", sessid);
269   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
270   g_free (name);
271   gst_object_unref (pad);
272 
273   pad = gst_element_get_static_pad (rtx, "sink");
274   name = g_strdup_printf ("sink_%u", sessid);
275   gst_element_add_pad (bin, gst_ghost_pad_new (name, pad));
276   g_free (name);
277   gst_object_unref (pad);
278 
279   return bin;
280 }
281 
282 static void
join_session(GstElement * pipeline,GstElement * rtpBin,SessionData * session)283 join_session (GstElement * pipeline, GstElement * rtpBin, SessionData * session)
284 {
285   GstElement *rtpSrc;
286   GstElement *rtcpSrc;
287   GstElement *rtcpSink;
288   gchar *padName;
289   guint basePort;
290 
291   g_print ("Joining session %p\n", session);
292 
293   session->rtpbin = g_object_ref (rtpBin);
294 
295   basePort = 5000 + (session->sessionNum * 6);
296 
297   rtpSrc = gst_element_factory_make ("udpsrc", NULL);
298   rtcpSrc = gst_element_factory_make ("udpsrc", NULL);
299   rtcpSink = gst_element_factory_make ("udpsink", NULL);
300   g_object_set (rtpSrc, "port", basePort, "caps", session->caps, NULL);
301   g_object_set (rtcpSink, "port", basePort + 5, "host", "127.0.0.1", "sync",
302       FALSE, "async", FALSE, NULL);
303   g_object_set (rtcpSrc, "port", basePort + 1, NULL);
304 
305   g_print ("Connecting to %i/%i/%i\n", basePort, basePort + 1, basePort + 5);
306 
307   /* enable RFC4588 retransmission handling by setting rtprtxreceive
308    * as the "aux" element of rtpbin */
309   g_signal_connect (rtpBin, "request-aux-receiver",
310       (GCallback) request_aux_receiver, session);
311 
312   gst_bin_add_many (GST_BIN (pipeline), rtpSrc, rtcpSrc, rtcpSink, NULL);
313 
314   g_signal_connect_data (rtpBin, "pad-added", G_CALLBACK (handle_new_stream),
315       session_ref (session), (GClosureNotify) session_unref, 0);
316 
317   g_signal_connect_data (rtpBin, "request-pt-map", G_CALLBACK (request_pt_map),
318       session_ref (session), (GClosureNotify) session_unref, 0);
319 
320   padName = g_strdup_printf ("recv_rtp_sink_%u", session->sessionNum);
321   gst_element_link_pads (rtpSrc, "src", rtpBin, padName);
322   g_free (padName);
323 
324   padName = g_strdup_printf ("recv_rtcp_sink_%u", session->sessionNum);
325   gst_element_link_pads (rtcpSrc, "src", rtpBin, padName);
326   g_free (padName);
327 
328   padName = g_strdup_printf ("send_rtcp_src_%u", session->sessionNum);
329   gst_element_link_pads (rtpBin, padName, rtcpSink, "sink");
330   g_free (padName);
331 
332   session_unref (session);
333 }
334 
335 int
main(int argc,char ** argv)336 main (int argc, char **argv)
337 {
338   GstPipeline *pipe;
339   SessionData *videoSession;
340   SessionData *audioSession;
341   GstElement *rtpBin;
342   GstBus *bus;
343 
344   gst_init (&argc, &argv);
345 
346   loop = g_main_loop_new (NULL, FALSE);
347   pipe = GST_PIPELINE (gst_pipeline_new (NULL));
348 
349   bus = gst_element_get_bus (GST_ELEMENT (pipe));
350   g_signal_connect (bus, "message::error", G_CALLBACK (cb_error), pipe);
351   g_signal_connect (bus, "message::warning", G_CALLBACK (cb_warning), pipe);
352   g_signal_connect (bus, "message::state-changed", G_CALLBACK (cb_state), pipe);
353   g_signal_connect (bus, "message::eos", G_CALLBACK (cb_eos), NULL);
354   gst_bus_add_signal_watch (bus);
355   gst_object_unref (bus);
356 
357   rtpBin = gst_element_factory_make ("rtpbin", NULL);
358   gst_bin_add (GST_BIN (pipe), rtpBin);
359   g_object_set (rtpBin, "latency", 200, "do-retransmission", TRUE,
360       "rtp-profile", GST_RTP_PROFILE_AVPF, NULL);
361 
362   videoSession = make_video_session (0);
363   audioSession = make_audio_session (1);
364 
365   join_session (GST_ELEMENT (pipe), rtpBin, videoSession);
366   join_session (GST_ELEMENT (pipe), rtpBin, audioSession);
367 
368   g_print ("starting client pipeline\n");
369   gst_element_set_state (GST_ELEMENT (pipe), GST_STATE_PLAYING);
370 
371   g_main_loop_run (loop);
372 
373   g_print ("stoping client pipeline\n");
374   gst_element_set_state (GST_ELEMENT (pipe), GST_STATE_NULL);
375 
376   gst_object_unref (pipe);
377   g_main_loop_unref (loop);
378 
379   return 0;
380 }
381