1 /* RTP muxer element for GStreamer
2  *
3  * gstrtpmux.c:
4  *
5  * Copyright (C) <2007-2010> Nokia Corporation.
6  *   Contact: Zeeshan Ali <zeeshan.ali@nokia.com>
7  * Copyright (C) <2007-2010> Collabora Ltd
8  *   Contact: Olivier Crete <olivier.crete@collabora.co.uk>
9  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
10  *               2000,2005 Wim Taymans <wim@fluendo.com>
11  *
12  * This library is free software; you can redistribute it and/or
13  * modify it under the terms of the GNU Library General Public
14  * License as published by the Free Software Foundation; either
15  * version 2 of the License, or (at your option) any later version.
16  *
17  * This library is distributed in the hope that it will be useful,
18  * but WITHOUT ANY WARRANTY; without even the implied warranty of
19  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20  * Library General Public License for more details.
21  *
22  * You should have received a copy of the GNU Library General Public
23  * License along with this library; if not, write to the
24  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
25  * Boston, MA 02110-1301, USA.
26  */
27 
28 /**
29  * SECTION:element-rtpmux
30  * @see_also: rtpdtmfmux
31  *
32  * The rtp muxer takes multiple RTP streams having the same clock-rate and
33  * muxes into a single stream with a single SSRC.
34  *
35  * <refsect2>
36  * <title>Example pipelines</title>
37  * |[
38  * gst-launch-1.0 rtpmux name=mux ! udpsink host=127.0.0.1 port=8888        \
39  *              alsasrc ! alawenc ! rtppcmapay !                        \
40  *              application/x-rtp, payload=8, rate=8000 ! mux.sink_0    \
41  *              audiotestsrc is-live=1 !                                \
42  *              mulawenc ! rtppcmupay !                                 \
43  *              application/x-rtp, payload=0, rate=8000 ! mux.sink_1
44  * ]|
45  * In this example, an audio stream is captured from ALSA and another is
46  * generated, both are encoded into different payload types and muxed together
47  * so they can be sent on the same port.
48  * </refsect2>
49  */
50 
51 #ifdef HAVE_CONFIG_H
52 #include "config.h"
53 #endif
54 
55 #include <gst/gst.h>
56 #include <gst/rtp/gstrtpbuffer.h>
57 #include <string.h>
58 
59 #include "gstrtpmux.h"
60 
61 GST_DEBUG_CATEGORY_STATIC (gst_rtp_mux_debug);
62 #define GST_CAT_DEFAULT gst_rtp_mux_debug
63 
64 enum
65 {
66   PROP_0,
67   PROP_TIMESTAMP_OFFSET,
68   PROP_SEQNUM_OFFSET,
69   PROP_SEQNUM,
70   PROP_SSRC
71 };
72 
73 #define DEFAULT_TIMESTAMP_OFFSET -1
74 #define DEFAULT_SEQNUM_OFFSET    -1
75 #define DEFAULT_SSRC             -1
76 
77 static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
78     GST_PAD_SRC,
79     GST_PAD_ALWAYS,
80     GST_STATIC_CAPS ("application/x-rtp")
81     );
82 
83 static GstStaticPadTemplate sink_factory = GST_STATIC_PAD_TEMPLATE ("sink_%u",
84     GST_PAD_SINK,
85     GST_PAD_REQUEST,
86     GST_STATIC_CAPS ("application/x-rtp")
87     );
88 
89 static GstPad *gst_rtp_mux_request_new_pad (GstElement * element,
90     GstPadTemplate * templ, const gchar * name, const GstCaps * caps);
91 static void gst_rtp_mux_release_pad (GstElement * element, GstPad * pad);
92 static GstFlowReturn gst_rtp_mux_chain (GstPad * pad, GstObject * parent,
93     GstBuffer * buffer);
94 static GstFlowReturn gst_rtp_mux_chain_list (GstPad * pad, GstObject * parent,
95     GstBufferList * bufferlist);
96 static gboolean gst_rtp_mux_setcaps (GstPad * pad, GstRTPMux * rtp_mux,
97     GstCaps * caps);
98 static gboolean gst_rtp_mux_sink_event (GstPad * pad, GstObject * parent,
99     GstEvent * event);
100 static gboolean gst_rtp_mux_sink_query (GstPad * pad, GstObject * parent,
101     GstQuery * query);
102 
103 static GstStateChangeReturn gst_rtp_mux_change_state (GstElement *
104     element, GstStateChange transition);
105 
106 static void gst_rtp_mux_set_property (GObject * object, guint prop_id,
107     const GValue * value, GParamSpec * pspec);
108 static void gst_rtp_mux_get_property (GObject * object, guint prop_id,
109     GValue * value, GParamSpec * pspec);
110 static void gst_rtp_mux_dispose (GObject * object);
111 
112 static gboolean gst_rtp_mux_src_event_real (GstRTPMux * rtp_mux,
113     GstEvent * event);
114 
115 G_DEFINE_TYPE (GstRTPMux, gst_rtp_mux, GST_TYPE_ELEMENT);
116 
117 
118 static void
gst_rtp_mux_class_init(GstRTPMuxClass * klass)119 gst_rtp_mux_class_init (GstRTPMuxClass * klass)
120 {
121   GObjectClass *gobject_class;
122   GstElementClass *gstelement_class;
123 
124   gobject_class = (GObjectClass *) klass;
125   gstelement_class = (GstElementClass *) klass;
126 
127 
128   gst_element_class_add_static_pad_template (gstelement_class, &src_factory);
129   gst_element_class_add_static_pad_template (gstelement_class, &sink_factory);
130 
131   gst_element_class_set_static_metadata (gstelement_class, "RTP muxer",
132       "Codec/Muxer",
133       "multiplex N rtp streams into one", "Zeeshan Ali <first.last@nokia.com>");
134 
135   gobject_class->get_property = gst_rtp_mux_get_property;
136   gobject_class->set_property = gst_rtp_mux_set_property;
137   gobject_class->dispose = gst_rtp_mux_dispose;
138 
139   klass->src_event = gst_rtp_mux_src_event_real;
140 
141   g_object_class_install_property (G_OBJECT_CLASS (klass),
142       PROP_TIMESTAMP_OFFSET, g_param_spec_int ("timestamp-offset",
143           "Timestamp Offset",
144           "Offset to add to all outgoing timestamps (-1 = random)", -1,
145           G_MAXINT, DEFAULT_TIMESTAMP_OFFSET,
146           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
147   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SEQNUM_OFFSET,
148       g_param_spec_int ("seqnum-offset", "Sequence number Offset",
149           "Offset to add to all outgoing seqnum (-1 = random)", -1, G_MAXINT,
150           DEFAULT_SEQNUM_OFFSET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
151   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SEQNUM,
152       g_param_spec_uint ("seqnum", "Sequence number",
153           "The RTP sequence number of the last processed packet",
154           0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
155   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_SSRC,
156       g_param_spec_uint ("ssrc", "SSRC",
157           "The SSRC of the packets (default == random)",
158           0, G_MAXUINT, DEFAULT_SSRC,
159           GST_PARAM_MUTABLE_PLAYING | G_PARAM_READWRITE |
160           G_PARAM_STATIC_STRINGS));
161 
162   gstelement_class->request_new_pad =
163       GST_DEBUG_FUNCPTR (gst_rtp_mux_request_new_pad);
164   gstelement_class->release_pad = GST_DEBUG_FUNCPTR (gst_rtp_mux_release_pad);
165   gstelement_class->change_state = GST_DEBUG_FUNCPTR (gst_rtp_mux_change_state);
166 }
167 
168 static void
gst_rtp_mux_dispose(GObject * object)169 gst_rtp_mux_dispose (GObject * object)
170 {
171   GstRTPMux *rtp_mux = GST_RTP_MUX (object);
172   GList *item;
173 
174   g_clear_object (&rtp_mux->last_pad);
175 
176 restart:
177   for (item = GST_ELEMENT_PADS (object); item; item = g_list_next (item)) {
178     GstPad *pad = GST_PAD (item->data);
179     if (GST_PAD_IS_SINK (pad)) {
180       gst_element_release_request_pad (GST_ELEMENT (object), pad);
181       goto restart;
182     }
183   }
184 
185   G_OBJECT_CLASS (gst_rtp_mux_parent_class)->dispose (object);
186 }
187 
188 static gboolean
gst_rtp_mux_src_event(GstPad * pad,GstObject * parent,GstEvent * event)189 gst_rtp_mux_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
190 {
191   GstRTPMux *rtp_mux = GST_RTP_MUX (parent);
192   GstRTPMuxClass *klass;
193   gboolean ret;
194 
195   klass = GST_RTP_MUX_GET_CLASS (rtp_mux);
196 
197   ret = klass->src_event (rtp_mux, event);
198 
199   return ret;
200 }
201 
202 static gboolean
gst_rtp_mux_src_event_real(GstRTPMux * rtp_mux,GstEvent * event)203 gst_rtp_mux_src_event_real (GstRTPMux * rtp_mux, GstEvent * event)
204 {
205   switch (GST_EVENT_TYPE (event)) {
206     case GST_EVENT_CUSTOM_UPSTREAM:
207     {
208       const GstStructure *s = gst_event_get_structure (event);
209 
210       if (gst_structure_has_name (s, "GstRTPCollision")) {
211         guint ssrc = 0;
212 
213         if (!gst_structure_get_uint (s, "ssrc", &ssrc))
214           ssrc = -1;
215 
216         GST_DEBUG_OBJECT (rtp_mux, "collided ssrc: %x", ssrc);
217 
218         /* choose another ssrc for our stream */
219         GST_OBJECT_LOCK (rtp_mux);
220         if (ssrc == rtp_mux->current_ssrc) {
221           GstCaps *caps;
222           guint suggested_ssrc = 0;
223           guint32 new_ssrc;
224 
225           if (gst_structure_get_uint (s, "suggested-ssrc", &suggested_ssrc))
226             rtp_mux->current_ssrc = suggested_ssrc;
227 
228           while (ssrc == rtp_mux->current_ssrc)
229             rtp_mux->current_ssrc = g_random_int ();
230 
231           new_ssrc = rtp_mux->current_ssrc;
232           GST_INFO_OBJECT (rtp_mux, "New ssrc after collision %x (was: %x)",
233               new_ssrc, ssrc);
234           GST_OBJECT_UNLOCK (rtp_mux);
235 
236           caps = gst_pad_get_current_caps (rtp_mux->srcpad);
237           caps = gst_caps_make_writable (caps);
238           gst_caps_set_simple (caps, "ssrc", G_TYPE_UINT, new_ssrc, NULL);
239           gst_pad_set_caps (rtp_mux->srcpad, caps);
240           gst_caps_unref (caps);
241         } else {
242           GST_OBJECT_UNLOCK (rtp_mux);
243         }
244       }
245       break;
246     }
247     default:
248       break;
249   }
250 
251 
252   return gst_pad_event_default (rtp_mux->srcpad, GST_OBJECT (rtp_mux), event);
253 }
254 
255 static void
gst_rtp_mux_init(GstRTPMux * rtp_mux)256 gst_rtp_mux_init (GstRTPMux * rtp_mux)
257 {
258   GstElementClass *klass = GST_ELEMENT_GET_CLASS (rtp_mux);
259 
260   rtp_mux->srcpad =
261       gst_pad_new_from_template (gst_element_class_get_pad_template (klass,
262           "src"), "src");
263   gst_pad_set_event_function (rtp_mux->srcpad,
264       GST_DEBUG_FUNCPTR (gst_rtp_mux_src_event));
265   gst_pad_use_fixed_caps (rtp_mux->srcpad);
266   gst_element_add_pad (GST_ELEMENT (rtp_mux), rtp_mux->srcpad);
267 
268   rtp_mux->ssrc = DEFAULT_SSRC;
269   rtp_mux->current_ssrc = DEFAULT_SSRC;
270   rtp_mux->ts_offset = DEFAULT_TIMESTAMP_OFFSET;
271   rtp_mux->seqnum_offset = DEFAULT_SEQNUM_OFFSET;
272 
273   rtp_mux->last_stop = GST_CLOCK_TIME_NONE;
274 }
275 
276 static void
gst_rtp_mux_setup_sinkpad(GstRTPMux * rtp_mux,GstPad * sinkpad)277 gst_rtp_mux_setup_sinkpad (GstRTPMux * rtp_mux, GstPad * sinkpad)
278 {
279   GstRTPMuxPadPrivate *padpriv = g_slice_new0 (GstRTPMuxPadPrivate);
280 
281   /* setup some pad functions */
282   gst_pad_set_chain_function (sinkpad, GST_DEBUG_FUNCPTR (gst_rtp_mux_chain));
283   gst_pad_set_chain_list_function (sinkpad,
284       GST_DEBUG_FUNCPTR (gst_rtp_mux_chain_list));
285   gst_pad_set_event_function (sinkpad,
286       GST_DEBUG_FUNCPTR (gst_rtp_mux_sink_event));
287   gst_pad_set_query_function (sinkpad,
288       GST_DEBUG_FUNCPTR (gst_rtp_mux_sink_query));
289 
290 
291   gst_segment_init (&padpriv->segment, GST_FORMAT_UNDEFINED);
292 
293   gst_pad_set_element_private (sinkpad, padpriv);
294 
295   gst_pad_set_active (sinkpad, TRUE);
296   gst_element_add_pad (GST_ELEMENT (rtp_mux), sinkpad);
297 }
298 
299 static GstPad *
gst_rtp_mux_request_new_pad(GstElement * element,GstPadTemplate * templ,const gchar * req_name,const GstCaps * caps)300 gst_rtp_mux_request_new_pad (GstElement * element,
301     GstPadTemplate * templ, const gchar * req_name, const GstCaps * caps)
302 {
303   GstRTPMux *rtp_mux;
304   GstPad *newpad;
305 
306   g_return_val_if_fail (templ != NULL, NULL);
307   g_return_val_if_fail (GST_IS_RTP_MUX (element), NULL);
308 
309   rtp_mux = GST_RTP_MUX (element);
310 
311   if (templ->direction != GST_PAD_SINK) {
312     GST_WARNING_OBJECT (rtp_mux, "request pad that is not a SINK pad");
313     return NULL;
314   }
315 
316   newpad = gst_pad_new_from_template (templ, req_name);
317   if (newpad)
318     gst_rtp_mux_setup_sinkpad (rtp_mux, newpad);
319   else
320     GST_WARNING_OBJECT (rtp_mux, "failed to create request pad");
321 
322   return newpad;
323 }
324 
325 static void
gst_rtp_mux_release_pad(GstElement * element,GstPad * pad)326 gst_rtp_mux_release_pad (GstElement * element, GstPad * pad)
327 {
328   GstRTPMuxPadPrivate *padpriv;
329 
330   GST_OBJECT_LOCK (element);
331   padpriv = gst_pad_get_element_private (pad);
332   gst_pad_set_element_private (pad, NULL);
333   GST_OBJECT_UNLOCK (element);
334 
335   gst_element_remove_pad (element, pad);
336 
337   if (padpriv) {
338     g_slice_free (GstRTPMuxPadPrivate, padpriv);
339   }
340 }
341 
342 /* Put our own timestamp-offset on the buffer */
343 static void
gst_rtp_mux_readjust_rtp_timestamp_locked(GstRTPMux * rtp_mux,GstRTPMuxPadPrivate * padpriv,GstRTPBuffer * rtpbuffer)344 gst_rtp_mux_readjust_rtp_timestamp_locked (GstRTPMux * rtp_mux,
345     GstRTPMuxPadPrivate * padpriv, GstRTPBuffer * rtpbuffer)
346 {
347   guint32 ts;
348   guint32 sink_ts_base = 0;
349 
350   if (padpriv && padpriv->have_timestamp_offset)
351     sink_ts_base = padpriv->timestamp_offset;
352 
353   ts = gst_rtp_buffer_get_timestamp (rtpbuffer) - sink_ts_base +
354       rtp_mux->ts_base;
355   GST_LOG_OBJECT (rtp_mux, "Re-adjusting RTP ts %u to %u",
356       gst_rtp_buffer_get_timestamp (rtpbuffer), ts);
357   gst_rtp_buffer_set_timestamp (rtpbuffer, ts);
358 }
359 
360 static gboolean
process_buffer_locked(GstRTPMux * rtp_mux,GstRTPMuxPadPrivate * padpriv,GstRTPBuffer * rtpbuffer)361 process_buffer_locked (GstRTPMux * rtp_mux, GstRTPMuxPadPrivate * padpriv,
362     GstRTPBuffer * rtpbuffer)
363 {
364   GstRTPMuxClass *klass = GST_RTP_MUX_GET_CLASS (rtp_mux);
365 
366   if (klass->accept_buffer_locked)
367     if (!klass->accept_buffer_locked (rtp_mux, padpriv, rtpbuffer))
368       return FALSE;
369 
370   rtp_mux->seqnum++;
371   gst_rtp_buffer_set_seq (rtpbuffer, rtp_mux->seqnum);
372 
373   gst_rtp_buffer_set_ssrc (rtpbuffer, rtp_mux->current_ssrc);
374   gst_rtp_mux_readjust_rtp_timestamp_locked (rtp_mux, padpriv, rtpbuffer);
375   GST_LOG_OBJECT (rtp_mux,
376       "Pushing packet size %" G_GSIZE_FORMAT ", seq=%d, ts=%u, ssrc=%x",
377       rtpbuffer->map[0].size, rtp_mux->seqnum,
378       gst_rtp_buffer_get_timestamp (rtpbuffer), rtp_mux->current_ssrc);
379 
380   if (padpriv) {
381     if (padpriv->segment.format == GST_FORMAT_TIME) {
382       GST_BUFFER_PTS (rtpbuffer->buffer) =
383           gst_segment_to_running_time (&padpriv->segment, GST_FORMAT_TIME,
384           GST_BUFFER_PTS (rtpbuffer->buffer));
385       GST_BUFFER_DTS (rtpbuffer->buffer) =
386           gst_segment_to_running_time (&padpriv->segment, GST_FORMAT_TIME,
387           GST_BUFFER_DTS (rtpbuffer->buffer));
388     }
389   }
390 
391   return TRUE;
392 }
393 
394 struct BufferListData
395 {
396   GstRTPMux *rtp_mux;
397   GstRTPMuxPadPrivate *padpriv;
398   gboolean drop;
399 };
400 
401 static gboolean
process_list_item(GstBuffer ** buffer,guint idx,gpointer user_data)402 process_list_item (GstBuffer ** buffer, guint idx, gpointer user_data)
403 {
404   struct BufferListData *bd = user_data;
405   GstRTPBuffer rtpbuffer = GST_RTP_BUFFER_INIT;
406 
407   *buffer = gst_buffer_make_writable (*buffer);
408 
409   gst_rtp_buffer_map (*buffer, GST_MAP_READWRITE, &rtpbuffer);
410 
411   bd->drop = !process_buffer_locked (bd->rtp_mux, bd->padpriv, &rtpbuffer);
412 
413   gst_rtp_buffer_unmap (&rtpbuffer);
414 
415   if (bd->drop)
416     return FALSE;
417 
418   if (GST_BUFFER_DURATION_IS_VALID (*buffer) &&
419       GST_BUFFER_PTS_IS_VALID (*buffer))
420     bd->rtp_mux->last_stop = GST_BUFFER_PTS (*buffer) +
421         GST_BUFFER_DURATION (*buffer);
422   else
423     bd->rtp_mux->last_stop = GST_CLOCK_TIME_NONE;
424 
425   return TRUE;
426 }
427 
428 static gboolean resend_events (GstPad * pad, GstEvent ** event,
429     gpointer user_data);
430 
431 static GstFlowReturn
gst_rtp_mux_chain_list(GstPad * pad,GstObject * parent,GstBufferList * bufferlist)432 gst_rtp_mux_chain_list (GstPad * pad, GstObject * parent,
433     GstBufferList * bufferlist)
434 {
435   GstRTPMux *rtp_mux;
436   GstFlowReturn ret;
437   GstRTPMuxPadPrivate *padpriv;
438   gboolean changed = FALSE;
439   struct BufferListData bd;
440 
441   rtp_mux = GST_RTP_MUX (parent);
442 
443   if (gst_pad_check_reconfigure (rtp_mux->srcpad)) {
444     GstCaps *current_caps = gst_pad_get_current_caps (pad);
445 
446     if (!gst_rtp_mux_setcaps (pad, rtp_mux, current_caps)) {
447       gst_pad_mark_reconfigure (rtp_mux->srcpad);
448       if (GST_PAD_IS_FLUSHING (rtp_mux->srcpad))
449         ret = GST_FLOW_FLUSHING;
450       else
451         ret = GST_FLOW_NOT_NEGOTIATED;
452       gst_buffer_list_unref (bufferlist);
453       goto out;
454     }
455     gst_caps_unref (current_caps);
456   }
457 
458   GST_OBJECT_LOCK (rtp_mux);
459 
460   padpriv = gst_pad_get_element_private (pad);
461   if (!padpriv) {
462     GST_OBJECT_UNLOCK (rtp_mux);
463     ret = GST_FLOW_NOT_LINKED;
464     gst_buffer_list_unref (bufferlist);
465     goto out;
466   }
467 
468   bd.rtp_mux = rtp_mux;
469   bd.padpriv = padpriv;
470   bd.drop = FALSE;
471 
472   bufferlist = gst_buffer_list_make_writable (bufferlist);
473   gst_buffer_list_foreach (bufferlist, process_list_item, &bd);
474 
475   if (!bd.drop && pad != rtp_mux->last_pad) {
476     changed = TRUE;
477     g_clear_object (&rtp_mux->last_pad);
478     rtp_mux->last_pad = g_object_ref (pad);
479   }
480 
481   GST_OBJECT_UNLOCK (rtp_mux);
482 
483   if (changed)
484     gst_pad_sticky_events_foreach (pad, resend_events, rtp_mux);
485 
486   if (bd.drop) {
487     gst_buffer_list_unref (bufferlist);
488     ret = GST_FLOW_OK;
489   } else {
490     ret = gst_pad_push_list (rtp_mux->srcpad, bufferlist);
491   }
492 
493 out:
494 
495   return ret;
496 }
497 
498 static gboolean
resend_events(GstPad * pad,GstEvent ** event,gpointer user_data)499 resend_events (GstPad * pad, GstEvent ** event, gpointer user_data)
500 {
501   GstRTPMux *rtp_mux = user_data;
502 
503   if (GST_EVENT_TYPE (*event) == GST_EVENT_CAPS) {
504     GstCaps *caps;
505 
506     gst_event_parse_caps (*event, &caps);
507     gst_rtp_mux_setcaps (pad, rtp_mux, caps);
508   } else if (GST_EVENT_TYPE (*event) == GST_EVENT_SEGMENT) {
509     GstSegment new_segment;
510     gst_segment_init (&new_segment, GST_FORMAT_TIME);
511     gst_pad_push_event (rtp_mux->srcpad, gst_event_new_segment (&new_segment));
512   } else {
513     gst_pad_push_event (rtp_mux->srcpad, gst_event_ref (*event));
514   }
515 
516   return TRUE;
517 }
518 
519 static GstFlowReturn
gst_rtp_mux_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)520 gst_rtp_mux_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
521 {
522   GstRTPMux *rtp_mux;
523   GstFlowReturn ret;
524   GstRTPMuxPadPrivate *padpriv;
525   gboolean drop;
526   gboolean changed = FALSE;
527   GstRTPBuffer rtpbuffer = GST_RTP_BUFFER_INIT;
528 
529   rtp_mux = GST_RTP_MUX (parent);
530 
531   if (gst_pad_check_reconfigure (rtp_mux->srcpad)) {
532     GstCaps *current_caps = gst_pad_get_current_caps (pad);
533 
534     if (!gst_rtp_mux_setcaps (pad, rtp_mux, current_caps)) {
535       gst_pad_mark_reconfigure (rtp_mux->srcpad);
536       if (GST_PAD_IS_FLUSHING (rtp_mux->srcpad))
537         ret = GST_FLOW_FLUSHING;
538       else
539         ret = GST_FLOW_NOT_NEGOTIATED;
540       gst_buffer_unref (buffer);
541       goto out;
542     }
543     gst_caps_unref (current_caps);
544   }
545 
546   GST_OBJECT_LOCK (rtp_mux);
547   padpriv = gst_pad_get_element_private (pad);
548 
549   if (!padpriv) {
550     GST_OBJECT_UNLOCK (rtp_mux);
551     gst_buffer_unref (buffer);
552     return GST_FLOW_NOT_LINKED;
553   }
554 
555   buffer = gst_buffer_make_writable (buffer);
556 
557   if (!gst_rtp_buffer_map (buffer, GST_MAP_READWRITE, &rtpbuffer)) {
558     GST_OBJECT_UNLOCK (rtp_mux);
559     gst_buffer_unref (buffer);
560     GST_ERROR_OBJECT (rtp_mux, "Invalid RTP buffer");
561     return GST_FLOW_ERROR;
562   }
563 
564   drop = !process_buffer_locked (rtp_mux, padpriv, &rtpbuffer);
565 
566   gst_rtp_buffer_unmap (&rtpbuffer);
567 
568   if (!drop) {
569     if (pad != rtp_mux->last_pad) {
570       changed = TRUE;
571       g_clear_object (&rtp_mux->last_pad);
572       rtp_mux->last_pad = g_object_ref (pad);
573     }
574 
575     if (GST_BUFFER_DURATION_IS_VALID (buffer) &&
576         GST_BUFFER_PTS_IS_VALID (buffer))
577       rtp_mux->last_stop = GST_BUFFER_PTS (buffer) +
578           GST_BUFFER_DURATION (buffer);
579     else
580       rtp_mux->last_stop = GST_CLOCK_TIME_NONE;
581   }
582 
583   GST_OBJECT_UNLOCK (rtp_mux);
584 
585   if (changed)
586     gst_pad_sticky_events_foreach (pad, resend_events, rtp_mux);
587 
588   if (drop) {
589     gst_buffer_unref (buffer);
590     ret = GST_FLOW_OK;
591   } else {
592     ret = gst_pad_push (rtp_mux->srcpad, buffer);
593   }
594 
595 out:
596   return ret;
597 }
598 
599 static gboolean
gst_rtp_mux_setcaps(GstPad * pad,GstRTPMux * rtp_mux,GstCaps * caps)600 gst_rtp_mux_setcaps (GstPad * pad, GstRTPMux * rtp_mux, GstCaps * caps)
601 {
602   GstStructure *structure;
603   gboolean ret = FALSE;
604   GstRTPMuxPadPrivate *padpriv;
605   GstCaps *peercaps;
606 
607   if (caps == NULL)
608     return FALSE;
609 
610   if (!gst_caps_is_fixed (caps))
611     return FALSE;
612 
613   peercaps = gst_pad_peer_query_caps (rtp_mux->srcpad, NULL);
614   if (peercaps) {
615     GstCaps *tcaps, *othercaps;;
616     tcaps = gst_pad_get_pad_template_caps (pad);
617     othercaps = gst_caps_intersect_full (peercaps, tcaps,
618         GST_CAPS_INTERSECT_FIRST);
619 
620     if (gst_caps_get_size (othercaps) > 0) {
621       structure = gst_caps_get_structure (othercaps, 0);
622       GST_OBJECT_LOCK (rtp_mux);
623       if (gst_structure_get_uint (structure, "ssrc", &rtp_mux->current_ssrc)) {
624         GST_INFO_OBJECT (pad, "Use downstream ssrc: %x", rtp_mux->current_ssrc);
625         rtp_mux->have_ssrc = TRUE;
626       }
627       if (gst_structure_get_uint (structure,
628               "timestamp-offset", &rtp_mux->ts_base)) {
629         GST_INFO_OBJECT (pad, "Use downstream timestamp-offset: %u",
630             rtp_mux->ts_base);
631       }
632       GST_OBJECT_UNLOCK (rtp_mux);
633     }
634 
635     gst_caps_unref (othercaps);
636 
637     gst_caps_unref (peercaps);
638     gst_caps_unref (tcaps);
639   }
640 
641   structure = gst_caps_get_structure (caps, 0);
642 
643   if (!structure)
644     return FALSE;
645 
646   GST_OBJECT_LOCK (rtp_mux);
647   padpriv = gst_pad_get_element_private (pad);
648   if (padpriv &&
649       gst_structure_get_uint (structure, "timestamp-offset",
650           &padpriv->timestamp_offset)) {
651     padpriv->have_timestamp_offset = TRUE;
652   }
653 
654   caps = gst_caps_copy (caps);
655 
656   /* if we don't have a specified ssrc, first try to take one from the caps,
657      and if that fails, generate one */
658   if (rtp_mux->ssrc == DEFAULT_SSRC) {
659     if (rtp_mux->current_ssrc == DEFAULT_SSRC) {
660       if (!gst_structure_get_uint (structure, "ssrc", &rtp_mux->current_ssrc)) {
661         rtp_mux->current_ssrc = g_random_int ();
662         GST_INFO_OBJECT (rtp_mux, "Set random ssrc %x", rtp_mux->current_ssrc);
663       }
664     }
665   } else {
666     rtp_mux->current_ssrc = rtp_mux->ssrc;
667     GST_INFO_OBJECT (rtp_mux, "Set ssrc %x", rtp_mux->current_ssrc);
668   }
669 
670   gst_caps_set_simple (caps,
671       "timestamp-offset", G_TYPE_UINT, rtp_mux->ts_base,
672       "seqnum-offset", G_TYPE_UINT, rtp_mux->seqnum_base,
673       "ssrc", G_TYPE_UINT, rtp_mux->current_ssrc, NULL);
674 
675   GST_OBJECT_UNLOCK (rtp_mux);
676 
677   if (rtp_mux->send_stream_start) {
678     gchar s_id[32];
679 
680     /* stream-start (FIXME: create id based on input ids) */
681     g_snprintf (s_id, sizeof (s_id), "interleave-%08x", g_random_int ());
682     gst_pad_push_event (rtp_mux->srcpad, gst_event_new_stream_start (s_id));
683 
684     rtp_mux->send_stream_start = FALSE;
685   }
686 
687   GST_DEBUG_OBJECT (rtp_mux,
688       "setting caps %" GST_PTR_FORMAT " on src pad..", caps);
689   ret = gst_pad_set_caps (rtp_mux->srcpad, caps);
690 
691 
692   gst_caps_unref (caps);
693 
694   return ret;
695 }
696 
697 static void
clear_caps(GstCaps * caps,gboolean only_clock_rate)698 clear_caps (GstCaps * caps, gboolean only_clock_rate)
699 {
700   gint i, j;
701 
702   /* Lets only match on the clock-rate */
703   for (i = 0; i < gst_caps_get_size (caps); i++) {
704     GstStructure *s = gst_caps_get_structure (caps, i);
705 
706     for (j = 0; j < gst_structure_n_fields (s); j++) {
707       const gchar *name = gst_structure_nth_field_name (s, j);
708 
709       if (strcmp (name, "clock-rate") && (only_clock_rate ||
710               (strcmp (name, "ssrc")))) {
711         gst_structure_remove_field (s, name);
712         j--;
713       }
714     }
715   }
716 }
717 
718 static gboolean
same_clock_rate_fold(const GValue * item,GValue * ret,gpointer user_data)719 same_clock_rate_fold (const GValue * item, GValue * ret, gpointer user_data)
720 {
721   GstPad *mypad = user_data;
722   GstPad *pad = g_value_get_object (item);
723   GstCaps *peercaps;
724   GstCaps *accumcaps;
725 
726   if (pad == mypad)
727     return TRUE;
728 
729   accumcaps = g_value_get_boxed (ret);
730   peercaps = gst_pad_peer_query_caps (pad, accumcaps);
731   if (!peercaps) {
732     g_warning ("no peercaps");
733     return TRUE;
734   }
735   peercaps = gst_caps_make_writable (peercaps);
736   clear_caps (peercaps, TRUE);
737 
738   g_value_take_boxed (ret, peercaps);
739 
740   return !gst_caps_is_empty (peercaps);
741 }
742 
743 static GstCaps *
gst_rtp_mux_getcaps(GstPad * pad,GstRTPMux * mux,GstCaps * filter)744 gst_rtp_mux_getcaps (GstPad * pad, GstRTPMux * mux, GstCaps * filter)
745 {
746   GstCaps *caps = NULL;
747   GstIterator *iter = NULL;
748   GValue v = { 0 };
749   GstIteratorResult res;
750   GstCaps *peercaps;
751   GstCaps *othercaps;
752   GstCaps *tcaps;
753   const GstStructure *structure;
754 
755   peercaps = gst_pad_peer_query_caps (mux->srcpad, NULL);
756 
757   if (peercaps) {
758     tcaps = gst_pad_get_pad_template_caps (pad);
759     othercaps = gst_caps_intersect_full (peercaps, tcaps,
760         GST_CAPS_INTERSECT_FIRST);
761     gst_caps_unref (peercaps);
762   } else {
763     tcaps = gst_pad_get_pad_template_caps (mux->srcpad);
764     if (filter)
765       othercaps = gst_caps_intersect_full (filter, tcaps,
766           GST_CAPS_INTERSECT_FIRST);
767     else
768       othercaps = gst_caps_copy (tcaps);
769   }
770   gst_caps_unref (tcaps);
771 
772   GST_LOG_OBJECT (pad, "Intersected srcpad-peercaps and template caps: %"
773       GST_PTR_FORMAT, othercaps);
774 
775   structure = gst_caps_get_structure (othercaps, 0);
776   if (mux->ssrc == DEFAULT_SSRC) {
777     if (gst_structure_get_uint (structure, "ssrc", &mux->current_ssrc))
778       GST_DEBUG_OBJECT (pad, "Use downstream ssrc: %x", mux->current_ssrc);
779   }
780 
781   clear_caps (othercaps, TRUE);
782 
783   g_value_init (&v, GST_TYPE_CAPS);
784 
785   iter = gst_element_iterate_sink_pads (GST_ELEMENT (mux));
786   do {
787     gst_value_set_caps (&v, othercaps);
788     res = gst_iterator_fold (iter, same_clock_rate_fold, &v, pad);
789     gst_iterator_resync (iter);
790   } while (res == GST_ITERATOR_RESYNC);
791   gst_iterator_free (iter);
792 
793   caps = gst_caps_intersect ((GstCaps *) gst_value_get_caps (&v), othercaps);
794 
795   g_value_unset (&v);
796   gst_caps_unref (othercaps);
797 
798   if (res == GST_ITERATOR_ERROR) {
799     gst_caps_unref (caps);
800     caps = gst_caps_new_empty ();
801   }
802 
803 
804   return caps;
805 }
806 
807 static gboolean
gst_rtp_mux_sink_query(GstPad * pad,GstObject * parent,GstQuery * query)808 gst_rtp_mux_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
809 {
810   GstRTPMux *mux = GST_RTP_MUX (parent);
811   gboolean res = FALSE;
812 
813   switch (GST_QUERY_TYPE (query)) {
814     case GST_QUERY_CAPS:
815     {
816       GstCaps *filter, *caps;
817 
818       gst_query_parse_caps (query, &filter);
819       GST_LOG_OBJECT (pad, "Received caps-query with filter-caps: %"
820           GST_PTR_FORMAT, filter);
821       caps = gst_rtp_mux_getcaps (pad, mux, filter);
822       gst_query_set_caps_result (query, caps);
823       GST_LOG_OBJECT (mux, "Answering caps-query with caps: %"
824           GST_PTR_FORMAT, caps);
825       gst_caps_unref (caps);
826       res = TRUE;
827       break;
828     }
829     default:
830       res = gst_pad_query_default (pad, parent, query);
831       break;
832   }
833 
834   return res;
835 }
836 
837 static void
gst_rtp_mux_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)838 gst_rtp_mux_get_property (GObject * object,
839     guint prop_id, GValue * value, GParamSpec * pspec)
840 {
841   GstRTPMux *rtp_mux;
842 
843   rtp_mux = GST_RTP_MUX (object);
844 
845   GST_OBJECT_LOCK (rtp_mux);
846   switch (prop_id) {
847     case PROP_TIMESTAMP_OFFSET:
848       g_value_set_int (value, rtp_mux->ts_offset);
849       break;
850     case PROP_SEQNUM_OFFSET:
851       g_value_set_int (value, rtp_mux->seqnum_offset);
852       break;
853     case PROP_SEQNUM:
854       g_value_set_uint (value, rtp_mux->seqnum);
855       break;
856     case PROP_SSRC:
857       g_value_set_uint (value, rtp_mux->ssrc);
858       break;
859     default:
860       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
861       break;
862   }
863   GST_OBJECT_UNLOCK (rtp_mux);
864 }
865 
866 static void
gst_rtp_mux_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)867 gst_rtp_mux_set_property (GObject * object,
868     guint prop_id, const GValue * value, GParamSpec * pspec)
869 {
870   GstRTPMux *rtp_mux;
871 
872   rtp_mux = GST_RTP_MUX (object);
873 
874   switch (prop_id) {
875     case PROP_TIMESTAMP_OFFSET:
876       rtp_mux->ts_offset = g_value_get_int (value);
877       break;
878     case PROP_SEQNUM_OFFSET:
879       rtp_mux->seqnum_offset = g_value_get_int (value);
880       break;
881     case PROP_SSRC:
882       GST_OBJECT_LOCK (rtp_mux);
883       rtp_mux->ssrc = g_value_get_uint (value);
884       rtp_mux->current_ssrc = rtp_mux->ssrc;
885       rtp_mux->have_ssrc = TRUE;
886       GST_DEBUG_OBJECT (rtp_mux, "ssrc prop set to %x", rtp_mux->ssrc);
887       GST_OBJECT_UNLOCK (rtp_mux);
888       break;
889     default:
890       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
891       break;
892   }
893 }
894 
895 static gboolean
gst_rtp_mux_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)896 gst_rtp_mux_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
897 {
898   GstRTPMux *mux = GST_RTP_MUX (parent);
899   gboolean is_pad;
900   gboolean ret;
901 
902   GST_OBJECT_LOCK (mux);
903   is_pad = (pad == mux->last_pad);
904   GST_OBJECT_UNLOCK (mux);
905 
906   switch (GST_EVENT_TYPE (event)) {
907     case GST_EVENT_CAPS:
908     {
909       GstCaps *caps;
910 
911       gst_event_parse_caps (event, &caps);
912       GST_LOG_OBJECT (pad, "Received caps-event with caps: %"
913           GST_PTR_FORMAT, caps);
914       ret = gst_rtp_mux_setcaps (pad, mux, caps);
915       gst_event_unref (event);
916       return ret;
917     }
918     case GST_EVENT_FLUSH_STOP:
919     {
920       GST_OBJECT_LOCK (mux);
921       mux->last_stop = GST_CLOCK_TIME_NONE;
922       GST_OBJECT_UNLOCK (mux);
923       break;
924     }
925     case GST_EVENT_SEGMENT:
926     {
927       GstRTPMuxPadPrivate *padpriv;
928 
929       GST_OBJECT_LOCK (mux);
930       padpriv = gst_pad_get_element_private (pad);
931 
932       if (padpriv) {
933         gst_event_copy_segment (event, &padpriv->segment);
934       }
935       GST_OBJECT_UNLOCK (mux);
936 
937       if (is_pad) {
938         GstSegment new_segment;
939         gst_segment_init (&new_segment, GST_FORMAT_TIME);
940         gst_event_unref (event);
941         event = gst_event_new_segment (&new_segment);
942       }
943       break;
944     }
945     default:
946       break;
947   }
948 
949   if (is_pad) {
950     return gst_pad_push_event (mux->srcpad, event);
951   } else {
952     gst_event_unref (event);
953     return TRUE;
954   }
955 }
956 
957 static void
gst_rtp_mux_ready_to_paused(GstRTPMux * rtp_mux)958 gst_rtp_mux_ready_to_paused (GstRTPMux * rtp_mux)
959 {
960 
961   GST_OBJECT_LOCK (rtp_mux);
962 
963   g_clear_object (&rtp_mux->last_pad);
964   rtp_mux->send_stream_start = TRUE;
965 
966   if (rtp_mux->seqnum_offset == -1)
967     rtp_mux->seqnum_base = g_random_int_range (0, G_MAXUINT16);
968   else
969     rtp_mux->seqnum_base = rtp_mux->seqnum_offset;
970   rtp_mux->seqnum = rtp_mux->seqnum_base;
971 
972   if (rtp_mux->ts_offset == -1)
973     rtp_mux->ts_base = g_random_int ();
974   else
975     rtp_mux->ts_base = rtp_mux->ts_offset;
976 
977   rtp_mux->last_stop = GST_CLOCK_TIME_NONE;
978 
979   if (rtp_mux->have_ssrc)
980     rtp_mux->current_ssrc = rtp_mux->ssrc;
981 
982   GST_DEBUG_OBJECT (rtp_mux, "set timestamp-offset to %u", rtp_mux->ts_base);
983 
984   GST_OBJECT_UNLOCK (rtp_mux);
985 }
986 
987 static GstStateChangeReturn
gst_rtp_mux_change_state(GstElement * element,GstStateChange transition)988 gst_rtp_mux_change_state (GstElement * element, GstStateChange transition)
989 {
990   GstRTPMux *rtp_mux;
991   GstStateChangeReturn ret;
992 
993   rtp_mux = GST_RTP_MUX (element);
994 
995   switch (transition) {
996     case GST_STATE_CHANGE_READY_TO_PAUSED:
997       gst_rtp_mux_ready_to_paused (rtp_mux);
998       break;
999     default:
1000       break;
1001   }
1002 
1003   ret = GST_ELEMENT_CLASS (gst_rtp_mux_parent_class)->change_state (element,
1004       transition);
1005 
1006   switch (transition) {
1007     case GST_STATE_CHANGE_PAUSED_TO_READY:
1008       g_clear_object (&rtp_mux->last_pad);
1009       break;
1010     default:
1011       break;
1012   }
1013 
1014   return ret;
1015 }
1016 
1017 gboolean
gst_rtp_mux_plugin_init(GstPlugin * plugin)1018 gst_rtp_mux_plugin_init (GstPlugin * plugin)
1019 {
1020   GST_DEBUG_CATEGORY_INIT (gst_rtp_mux_debug, "rtpmux", 0, "rtp muxer");
1021 
1022   return gst_element_register (plugin, "rtpmux", GST_RANK_NONE,
1023       GST_TYPE_RTP_MUX);
1024 }
1025