1 /* GStreamer
2  *
3  * Copyright (C) 2018 Collabora Ltd.
4  *               Author: Nicolas Dufresne <nicolas.dufresne@collabora.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 #include <gst/rtp/gstrtpbuffer.h>
22 #include <gst/check/gstcheck.h>
23 #include <gst/check/gstharness.h>
24 
25 #define TEST_BUF_CLOCK_RATE 8000
26 #define TEST_BUF_PT 0
27 #define TEST_BUF_SSRC 0x01BADBAD
28 #define TEST_BUF_MS  20
29 #define TEST_BUF_DURATION (TEST_BUF_MS * GST_MSECOND)
30 #define TEST_BUF_SIZE (64000 * TEST_BUF_MS / 1000)
31 #define TEST_RTP_TS_DURATION (TEST_BUF_CLOCK_RATE * TEST_BUF_MS / 1000)
32 
33 static GstCaps *
generate_caps(void)34 generate_caps (void)
35 {
36   return gst_caps_new_simple ("application/x-rtp",
37       "media", G_TYPE_STRING, "audio",
38       "clock-rate", G_TYPE_INT, TEST_BUF_CLOCK_RATE, NULL);
39 }
40 
41 static GstBuffer *
create_buffer(guint seq_num,guint32 ssrc)42 create_buffer (guint seq_num, guint32 ssrc)
43 {
44   GstBuffer *buf;
45   guint8 *payload;
46   guint i;
47   GstClockTime dts = seq_num * TEST_BUF_DURATION;
48   guint32 rtp_ts = seq_num * TEST_RTP_TS_DURATION;
49   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
50 
51   buf = gst_rtp_buffer_new_allocate (TEST_BUF_SIZE, 0, 0);
52   GST_BUFFER_DTS (buf) = dts;
53 
54   gst_rtp_buffer_map (buf, GST_MAP_READWRITE, &rtp);
55   gst_rtp_buffer_set_payload_type (&rtp, TEST_BUF_PT);
56   gst_rtp_buffer_set_seq (&rtp, seq_num);
57   gst_rtp_buffer_set_timestamp (&rtp, rtp_ts);
58   gst_rtp_buffer_set_ssrc (&rtp, ssrc);
59 
60   payload = gst_rtp_buffer_get_payload (&rtp);
61   for (i = 0; i < TEST_BUF_SIZE; i++)
62     payload[i] = 0xff;
63 
64   gst_rtp_buffer_unmap (&rtp);
65 
66   return buf;
67 }
68 
69 typedef struct
70 {
71   GstHarness *rtp_sink;
72   GstHarness *rtcp_sink;
73   GstHarness *rtp_src;
74   GstHarness *rtcp_src;
75 } TestContext;
76 
77 static void
rtpssrcdemux_pad_added(G_GNUC_UNUSED GstElement * demux,GstPad * src_pad,TestContext * ctx)78 rtpssrcdemux_pad_added (G_GNUC_UNUSED GstElement * demux, GstPad * src_pad,
79     TestContext * ctx)
80 {
81   GstHarness *h;
82 
83   h = gst_harness_new_with_element (ctx->rtp_sink->element, NULL,
84       GST_PAD_NAME (src_pad));
85 
86   /* FIXME We should also check that pads have current caps, but this is not
87    * currently the case as both pads are created when the first pad receive a
88    * buffer. If the other pad is not linked, you'll get a pad without caps.
89    * Changing this implies not having both pads on 'on-new-ssrc' which would
90    * break rtpbin assumption. */
91 
92   if (g_str_has_prefix (GST_PAD_NAME (src_pad), "src_")) {
93     g_assert (ctx->rtp_src == NULL);
94     ctx->rtp_src = h;
95   } else if (g_str_has_prefix (GST_PAD_NAME (src_pad), "rtcp_src_")) {
96     g_assert (ctx->rtcp_src == NULL);
97     ctx->rtcp_src = h;
98   } else {
99     g_assert_not_reached ();
100   }
101 }
102 
GST_START_TEST(test_event_forwarding)103 GST_START_TEST (test_event_forwarding)
104 {
105   TestContext ctx = { NULL, };
106   GstHarness *h;
107   GstEvent *event;
108   GstCaps *caps;
109   GstStructure *s;
110   guint ssrc;
111 
112   ctx.rtp_sink = h = gst_harness_new_with_padnames ("rtpssrcdemux", "sink",
113       NULL);
114   g_signal_connect (h->element, "pad_added",
115       G_CALLBACK (rtpssrcdemux_pad_added), &ctx);
116 
117   ctx.rtcp_sink = gst_harness_new_with_element (h->element, "rtcp_sink", NULL);
118 
119   gst_harness_set_src_caps (h, generate_caps ());
120   gst_harness_push (h, create_buffer (0, TEST_BUF_SSRC));
121 
122   g_assert (ctx.rtp_src);
123   g_assert (ctx.rtcp_src);
124 
125   gst_harness_push_event (h, gst_event_new_eos ());
126 
127   /* We expect stream-start/caps/segment/eos */
128   g_assert_cmpint (gst_harness_events_in_queue (ctx.rtp_src), ==, 4);
129 
130   event = gst_harness_pull_event (ctx.rtp_src);
131   g_assert_cmpint (event->type, ==, GST_EVENT_STREAM_START);
132   gst_event_unref (event);
133 
134   event = gst_harness_pull_event (ctx.rtp_src);
135   g_assert_cmpint (event->type, ==, GST_EVENT_CAPS);
136   gst_event_parse_caps (event, &caps);
137   s = gst_caps_get_structure (caps, 0);
138   g_assert (gst_structure_has_field (s, "ssrc"));
139   g_assert (gst_structure_get_uint (s, "ssrc", &ssrc));
140   g_assert_cmpuint (ssrc, ==, TEST_BUF_SSRC);
141   gst_event_unref (event);
142 
143   event = gst_harness_pull_event (ctx.rtp_src);
144   g_assert_cmpint (event->type, ==, GST_EVENT_SEGMENT);
145   gst_event_unref (event);
146 
147   event = gst_harness_pull_event (ctx.rtp_src);
148   g_assert_cmpint (event->type, ==, GST_EVENT_EOS);
149   gst_event_unref (event);
150 
151   /* We pushed on the RTP pad, no events should have reached the RTCP pad */
152   g_assert_cmpint (gst_harness_events_in_queue (ctx.rtcp_src), ==, 0);
153 
154   /* push EOS on the rtcp sink pad, to make sure it EOS properly, the harness
155    * will create the missing stream-start */
156   gst_harness_push_event (ctx.rtcp_sink, gst_event_new_eos ());
157 
158   g_assert_cmpint (gst_harness_events_in_queue (ctx.rtp_src), ==, 0);
159   g_assert_cmpint (gst_harness_events_in_queue (ctx.rtcp_src), ==, 2);
160 
161   event = gst_harness_pull_event (ctx.rtcp_src);
162   g_assert_cmpint (event->type, ==, GST_EVENT_STREAM_START);
163   gst_event_unref (event);
164 
165   event = gst_harness_pull_event (ctx.rtcp_src);
166   g_assert_cmpint (event->type, ==, GST_EVENT_EOS);
167   gst_event_unref (event);
168 
169   gst_harness_teardown (ctx.rtp_src);
170   gst_harness_teardown (ctx.rtcp_src);
171   gst_harness_teardown (ctx.rtcp_sink);
172   gst_harness_teardown (ctx.rtp_sink);
173 }
174 
175 GST_END_TEST;
176 
177 typedef struct
178 {
179   gint ready;
180   GMutex mutex;
181   GCond cond;
182 } LockTestContext;
183 
184 static void
new_ssrc_pad_cb(GstElement * element,guint ssrc,GstPad * pad,LockTestContext * ctx)185 new_ssrc_pad_cb (GstElement * element, guint ssrc, GstPad * pad,
186     LockTestContext * ctx)
187 {
188   g_message ("Signalling ready");
189   g_atomic_int_set (&ctx->ready, 1);
190 
191   g_message ("Waiting no more ready");
192   while (g_atomic_int_get (&ctx->ready))
193     g_usleep (G_USEC_PER_SEC / 100);
194 
195   g_mutex_lock (&ctx->mutex);
196   g_mutex_unlock (&ctx->mutex);
197 }
198 
199 static gpointer
push_buffer_func(gpointer user_data)200 push_buffer_func (gpointer user_data)
201 {
202   GstHarness *h = user_data;
203   gst_harness_push (h, create_buffer (0, 0xdeadbeef));
204   return NULL;
205 }
206 
GST_START_TEST(test_oob_event_locking)207 GST_START_TEST (test_oob_event_locking)
208 {
209   GstHarness *h = gst_harness_new_with_padnames ("rtpssrcdemux", "sink", NULL);
210   LockTestContext ctx = { FALSE, };
211   GThread *thread;
212 
213   g_mutex_init (&ctx.mutex);
214   g_cond_init (&ctx.cond);
215 
216   gst_harness_set_src_caps_str (h, "application/x-rtp");
217   g_signal_connect (h->element,
218       "new-ssrc-pad", G_CALLBACK (new_ssrc_pad_cb), &ctx);
219 
220   thread = g_thread_new ("streaming-thread", push_buffer_func, h);
221 
222   g_mutex_lock (&ctx.mutex);
223 
224   g_message ("Waiting for ready");
225   while (!g_atomic_int_get (&ctx.ready))
226     g_usleep (G_USEC_PER_SEC / 100);
227   g_message ("Signal no more ready");
228   g_atomic_int_set (&ctx.ready, 0);
229 
230   gst_harness_push_event (h,
231       gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM_OOB, NULL));
232 
233   g_mutex_unlock (&ctx.mutex);
234 
235   g_thread_join (thread);
236   g_mutex_clear (&ctx.mutex);
237   g_cond_clear (&ctx.cond);
238   gst_harness_teardown (h);
239 }
240 
241 GST_END_TEST;
242 
243 static Suite *
rtpssrcdemux_suite(void)244 rtpssrcdemux_suite (void)
245 {
246   Suite *s = suite_create ("rtpssrcdemux");
247   TCase *tc_chain = tcase_create ("general");
248 
249   suite_add_tcase (s, tc_chain);
250   tcase_add_test (tc_chain, test_event_forwarding);
251   tcase_add_test (tc_chain, test_oob_event_locking);
252 
253   return s;
254 }
255 
256 GST_CHECK_MAIN (rtpssrcdemux);
257