1 /* GStreamer
2 * Copyright (C) 2017 Matthew Waters <matthew@centricular.com>
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
13 *
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
18 */
19
20 #ifdef HAVE_CONFIG_H
21 # include "config.h"
22 #endif
23
24 #include "transportsendbin.h"
25 #include "utils.h"
26
27 /*
28 * ,------------------------transport_send_%u-------------------------,
29 * ; ,-----dtlssrtpenc---, ;
30 * data_sink o--------------------------o data_sink ; ;
31 * ; ; ; ,---nicesink---, ;
32 * rtp_sink o--------------------------o rtp_sink_0 src o--o sink ; ;
33 * ; ; ; '--------------' ;
34 * ; ,--outputselector--, ,-o rtcp_sink_0 ; ;
35 * ; ; src_0 o-' '-------------------' ;
36 * rtcp_sink ;---o sink ; ,----dtlssrtpenc----, ,---nicesink---, ;
37 * ; ; src_1 o---o rtcp_sink_0 src o--o sink ; ;
38 * ; '------------------' '-------------------' '--------------' ;
39 * '------------------------------------------------------------------'
40 *
41 * outputselecter is used to switch between rtcp-mux and no rtcp-mux
42 *
43 * FIXME: Do we need a valve drop=TRUE for the no RTCP case?
44 */
45
46 #define GST_CAT_DEFAULT gst_webrtc_transport_send_bin_debug
47 GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
48
49 #define transport_send_bin_parent_class parent_class
50 G_DEFINE_TYPE_WITH_CODE (TransportSendBin, transport_send_bin, GST_TYPE_BIN,
51 GST_DEBUG_CATEGORY_INIT (gst_webrtc_transport_send_bin_debug,
52 "webrtctransportsendbin", 0, "webrtctransportsendbin"););
53
54 static GstStaticPadTemplate rtp_sink_template =
55 GST_STATIC_PAD_TEMPLATE ("rtp_sink",
56 GST_PAD_SINK,
57 GST_PAD_ALWAYS,
58 GST_STATIC_CAPS ("application/x-rtp"));
59
60 static GstStaticPadTemplate rtcp_sink_template =
61 GST_STATIC_PAD_TEMPLATE ("rtcp_sink",
62 GST_PAD_SINK,
63 GST_PAD_ALWAYS,
64 GST_STATIC_CAPS ("application/x-rtp"));
65
66 static GstStaticPadTemplate data_sink_template =
67 GST_STATIC_PAD_TEMPLATE ("data_sink",
68 GST_PAD_SINK,
69 GST_PAD_ALWAYS,
70 GST_STATIC_CAPS_ANY);
71
72 enum
73 {
74 PROP_0,
75 PROP_STREAM,
76 PROP_RTCP_MUX,
77 };
78
79 #define TSB_GET_LOCK(tsb) (&tsb->lock)
80 #define TSB_LOCK(tsb) (g_mutex_lock (TSB_GET_LOCK(tsb)))
81 #define TSB_UNLOCK(tsb) (g_mutex_unlock (TSB_GET_LOCK(tsb)))
82
83 static void cleanup_blocks (TransportSendBin * send);
84 static void tsb_remove_probe (struct pad_block *block);
85
86 static void
_set_rtcp_mux(TransportSendBin * send,gboolean rtcp_mux)87 _set_rtcp_mux (TransportSendBin * send, gboolean rtcp_mux)
88 {
89 GstPad *active_pad;
90
91 if (rtcp_mux)
92 active_pad = gst_element_get_static_pad (send->outputselector, "src_0");
93 else
94 active_pad = gst_element_get_static_pad (send->outputselector, "src_1");
95 send->rtcp_mux = rtcp_mux;
96 GST_OBJECT_UNLOCK (send);
97
98 g_object_set (send->outputselector, "active-pad", active_pad, NULL);
99
100 gst_object_unref (active_pad);
101 GST_OBJECT_LOCK (send);
102 }
103
104 static void
transport_send_bin_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)105 transport_send_bin_set_property (GObject * object, guint prop_id,
106 const GValue * value, GParamSpec * pspec)
107 {
108 TransportSendBin *send = TRANSPORT_SEND_BIN (object);
109
110 GST_OBJECT_LOCK (send);
111 switch (prop_id) {
112 case PROP_STREAM:
113 /* XXX: weak-ref this? Note, it's construct-only so can't be changed later */
114 send->stream = TRANSPORT_STREAM (g_value_get_object (value));
115 break;
116 case PROP_RTCP_MUX:
117 _set_rtcp_mux (send, g_value_get_boolean (value));
118 break;
119 default:
120 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
121 break;
122 }
123 GST_OBJECT_UNLOCK (send);
124 }
125
126 static void
transport_send_bin_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)127 transport_send_bin_get_property (GObject * object, guint prop_id,
128 GValue * value, GParamSpec * pspec)
129 {
130 TransportSendBin *send = TRANSPORT_SEND_BIN (object);
131
132 GST_OBJECT_LOCK (send);
133 switch (prop_id) {
134 case PROP_STREAM:
135 g_value_set_object (value, send->stream);
136 break;
137 case PROP_RTCP_MUX:
138 g_value_set_boolean (value, send->rtcp_mux);
139 break;
140 default:
141 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
142 break;
143 }
144 GST_OBJECT_UNLOCK (send);
145 }
146
147 static GstPadProbeReturn
pad_block(GstPad * pad,GstPadProbeInfo * info,gpointer unused)148 pad_block (GstPad * pad, GstPadProbeInfo * info, gpointer unused)
149 {
150 GST_LOG_OBJECT (pad, "blocking pad with data %" GST_PTR_FORMAT, info->data);
151
152 return GST_PAD_PROBE_OK;
153 }
154
155 /* We block RTP/RTCP dataflow until the relevant DTLS key
156 * nego is done, but we need to block the *peer* src pad
157 * because the dtlssrtpenc state changes are done manually,
158 * and otherwise we can get state change problems trying to shut down */
159 static struct pad_block *
block_peer_pad(GstElement * elem,const gchar * pad_name)160 block_peer_pad (GstElement * elem, const gchar * pad_name)
161 {
162 GstPad *pad, *peer;
163 struct pad_block *block;
164
165 pad = gst_element_get_static_pad (elem, pad_name);
166 peer = gst_pad_get_peer (pad);
167 block = _create_pad_block (elem, peer, 0, NULL, NULL);
168 block->block_id = gst_pad_add_probe (peer,
169 GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER |
170 GST_PAD_PROBE_TYPE_BUFFER_LIST, (GstPadProbeCallback) pad_block, NULL,
171 NULL);
172 gst_object_unref (pad);
173 gst_object_unref (peer);
174 return block;
175 }
176
177 static void
tsb_remove_probe(struct pad_block * block)178 tsb_remove_probe (struct pad_block *block)
179 {
180 if (block && block->block_id) {
181 gst_pad_remove_probe (block->pad, block->block_id);
182 block->block_id = 0;
183 }
184 }
185
186 static GstStateChangeReturn
transport_send_bin_change_state(GstElement * element,GstStateChange transition)187 transport_send_bin_change_state (GstElement * element,
188 GstStateChange transition)
189 {
190 TransportSendBin *send = TRANSPORT_SEND_BIN (element);
191 GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
192
193 GST_DEBUG_OBJECT (element, "changing state: %s => %s",
194 gst_element_state_get_name (GST_STATE_TRANSITION_CURRENT (transition)),
195 gst_element_state_get_name (GST_STATE_TRANSITION_NEXT (transition)));
196
197 switch (transition) {
198 case GST_STATE_CHANGE_NULL_TO_READY:{
199 /* XXX: don't change state until the client-ness has been chosen
200 * arguably the element should be able to deal with this itself or
201 * we should only add it once/if we get the encoding keys */
202 TSB_LOCK (send);
203 gst_element_set_locked_state (send->rtp_ctx.dtlssrtpenc, TRUE);
204 gst_element_set_locked_state (send->rtcp_ctx.dtlssrtpenc, TRUE);
205 send->active = TRUE;
206 TSB_UNLOCK (send);
207 break;
208 }
209 case GST_STATE_CHANGE_READY_TO_PAUSED:{
210 GstElement *elem;
211
212 TSB_LOCK (send);
213 /* RTP */
214 /* unblock the encoder once the key is set, this should also be automatic */
215 elem = send->stream->transport->dtlssrtpenc;
216 send->rtp_ctx.rtp_block = block_peer_pad (elem, "rtp_sink_0");
217 /* Also block the RTCP pad on the RTP encoder, in case we mux RTCP */
218 send->rtp_ctx.rtcp_block = block_peer_pad (elem, "rtcp_sink_0");
219 /* unblock ice sink once a connection is made, this should also be automatic */
220 elem = send->stream->transport->transport->sink;
221 send->rtp_ctx.nice_block = block_peer_pad (elem, "sink");
222
223 /* RTCP */
224 elem = send->stream->rtcp_transport->dtlssrtpenc;
225 /* Block the RTCP DTLS encoder */
226 send->rtcp_ctx.rtcp_block = block_peer_pad (elem, "rtcp_sink_0");
227 /* unblock ice sink once a connection is made, this should also be automatic */
228 elem = send->stream->rtcp_transport->transport->sink;
229 send->rtcp_ctx.nice_block = block_peer_pad (elem, "sink");
230 TSB_UNLOCK (send);
231 break;
232 }
233 default:
234 break;
235 }
236
237 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
238 if (ret == GST_STATE_CHANGE_FAILURE) {
239 GST_WARNING_OBJECT (element, "Parent state change handler failed");
240 return ret;
241 }
242
243 switch (transition) {
244 case GST_STATE_CHANGE_PAUSED_TO_READY:
245 {
246 /* Now that everything is stopped, we can remove the pad blocks
247 * if they still exist, without accidentally feeding data to the
248 * dtlssrtpenc elements */
249 TSB_LOCK (send);
250 tsb_remove_probe (send->rtp_ctx.rtp_block);
251 tsb_remove_probe (send->rtp_ctx.rtcp_block);
252 tsb_remove_probe (send->rtp_ctx.nice_block);
253
254 tsb_remove_probe (send->rtcp_ctx.rtcp_block);
255 tsb_remove_probe (send->rtcp_ctx.nice_block);
256 TSB_UNLOCK (send);
257 break;
258 }
259 case GST_STATE_CHANGE_READY_TO_NULL:{
260 TSB_LOCK (send);
261 send->active = FALSE;
262 cleanup_blocks (send);
263
264 gst_element_set_locked_state (send->rtp_ctx.dtlssrtpenc, FALSE);
265 gst_element_set_locked_state (send->rtcp_ctx.dtlssrtpenc, FALSE);
266 TSB_UNLOCK (send);
267
268 break;
269 }
270 default:
271 break;
272 }
273
274 return ret;
275 }
276
277 static void
_on_dtls_enc_key_set(GstElement * dtlssrtpenc,TransportSendBin * send)278 _on_dtls_enc_key_set (GstElement * dtlssrtpenc, TransportSendBin * send)
279 {
280 TransportSendBinDTLSContext *ctx;
281
282 if (dtlssrtpenc == send->rtp_ctx.dtlssrtpenc)
283 ctx = &send->rtp_ctx;
284 else if (dtlssrtpenc == send->rtcp_ctx.dtlssrtpenc)
285 ctx = &send->rtcp_ctx;
286 else {
287 GST_WARNING_OBJECT (send,
288 "Received dtls-enc key info for unknown element %" GST_PTR_FORMAT,
289 dtlssrtpenc);
290 return;
291 }
292
293 TSB_LOCK (send);
294 if (!send->active) {
295 GST_INFO_OBJECT (send, "Received dtls-enc key info from %" GST_PTR_FORMAT
296 "when not active", dtlssrtpenc);
297 goto done;
298 }
299
300 GST_LOG_OBJECT (send, "Unblocking %" GST_PTR_FORMAT " pads", dtlssrtpenc);
301 _free_pad_block (ctx->rtp_block);
302 _free_pad_block (ctx->rtcp_block);
303 ctx->rtp_block = ctx->rtcp_block = NULL;
304
305 done:
306 TSB_UNLOCK (send);
307 }
308
309 static void
_on_notify_dtls_client_status(GstElement * dtlssrtpenc,GParamSpec * pspec,TransportSendBin * send)310 _on_notify_dtls_client_status (GstElement * dtlssrtpenc,
311 GParamSpec * pspec, TransportSendBin * send)
312 {
313 TransportSendBinDTLSContext *ctx;
314 if (dtlssrtpenc == send->rtp_ctx.dtlssrtpenc)
315 ctx = &send->rtp_ctx;
316 else if (dtlssrtpenc == send->rtcp_ctx.dtlssrtpenc)
317 ctx = &send->rtcp_ctx;
318 else {
319 GST_WARNING_OBJECT (send,
320 "Received dtls-enc client mode for unknown element %" GST_PTR_FORMAT,
321 dtlssrtpenc);
322 return;
323 }
324
325 TSB_LOCK (send);
326 if (!send->active) {
327 GST_DEBUG_OBJECT (send,
328 "DTLS-SRTP encoder ready after we're already stopping");
329 goto done;
330 }
331
332 GST_DEBUG_OBJECT (send,
333 "DTLS-SRTP encoder configured. Unlocking it and changing state %"
334 GST_PTR_FORMAT, ctx->dtlssrtpenc);
335 gst_element_set_locked_state (ctx->dtlssrtpenc, FALSE);
336 gst_element_sync_state_with_parent (ctx->dtlssrtpenc);
337 done:
338 TSB_UNLOCK (send);
339 }
340
341 static void
_on_notify_ice_connection_state(GstWebRTCICETransport * transport,GParamSpec * pspec,TransportSendBin * send)342 _on_notify_ice_connection_state (GstWebRTCICETransport * transport,
343 GParamSpec * pspec, TransportSendBin * send)
344 {
345 GstWebRTCICEConnectionState state;
346
347 g_object_get (transport, "state", &state, NULL);
348
349 if (state == GST_WEBRTC_ICE_CONNECTION_STATE_CONNECTED ||
350 state == GST_WEBRTC_ICE_CONNECTION_STATE_COMPLETED) {
351 TSB_LOCK (send);
352 if (transport == send->stream->transport->transport) {
353 if (send->rtp_ctx.nice_block) {
354 GST_LOG_OBJECT (send, "Unblocking pad %" GST_PTR_FORMAT,
355 send->rtp_ctx.nice_block->pad);
356 _free_pad_block (send->rtp_ctx.nice_block);
357 send->rtp_ctx.nice_block = NULL;
358 }
359 } else if (transport == send->stream->rtcp_transport->transport) {
360 if (send->rtcp_ctx.nice_block) {
361 GST_LOG_OBJECT (send, "Unblocking pad %" GST_PTR_FORMAT,
362 send->rtcp_ctx.nice_block->pad);
363 _free_pad_block (send->rtcp_ctx.nice_block);
364 send->rtcp_ctx.nice_block = NULL;
365 }
366 }
367 TSB_UNLOCK (send);
368 }
369 }
370
371 static void
tsb_setup_ctx(TransportSendBin * send,TransportSendBinDTLSContext * ctx,GstWebRTCDTLSTransport * transport)372 tsb_setup_ctx (TransportSendBin * send, TransportSendBinDTLSContext * ctx,
373 GstWebRTCDTLSTransport * transport)
374 {
375 GstElement *dtlssrtpenc, *nicesink;
376
377 dtlssrtpenc = ctx->dtlssrtpenc = transport->dtlssrtpenc;
378 nicesink = ctx->nicesink = transport->transport->sink;
379
380 /* unblock the encoder once the key is set */
381 g_signal_connect (dtlssrtpenc, "on-key-set",
382 G_CALLBACK (_on_dtls_enc_key_set), send);
383 /* Bring the encoder up to current state only once the is-client prop is set */
384 g_signal_connect (dtlssrtpenc, "notify::is-client",
385 G_CALLBACK (_on_notify_dtls_client_status), send);
386 gst_bin_add (GST_BIN (send), GST_ELEMENT (dtlssrtpenc));
387
388 /* unblock ice sink once it signals a connection */
389 g_signal_connect (transport->transport, "notify::state",
390 G_CALLBACK (_on_notify_ice_connection_state), send);
391 gst_bin_add (GST_BIN (send), GST_ELEMENT (nicesink));
392
393 if (!gst_element_link_pads (GST_ELEMENT (dtlssrtpenc), "src", nicesink,
394 "sink"))
395 g_warn_if_reached ();
396 }
397
398 static void
transport_send_bin_constructed(GObject * object)399 transport_send_bin_constructed (GObject * object)
400 {
401 TransportSendBin *send = TRANSPORT_SEND_BIN (object);
402 GstWebRTCDTLSTransport *transport;
403 GstPadTemplate *templ;
404 GstPad *ghost, *pad;
405
406 g_return_if_fail (send->stream);
407
408 g_object_bind_property (send, "rtcp-mux", send->stream, "rtcp-mux",
409 G_BINDING_BIDIRECTIONAL);
410
411 /* Output selector to direct the RTCP for muxed-mode */
412 send->outputselector = gst_element_factory_make ("output-selector", NULL);
413 gst_bin_add (GST_BIN (send), send->outputselector);
414
415 /* RTP */
416 transport = send->stream->transport;
417 /* Do the common init for the context struct */
418 tsb_setup_ctx (send, &send->rtp_ctx, transport);
419
420 templ = _find_pad_template (transport->dtlssrtpenc,
421 GST_PAD_SINK, GST_PAD_REQUEST, "rtp_sink_%d");
422 pad = gst_element_request_pad (transport->dtlssrtpenc, templ, "rtp_sink_0",
423 NULL);
424
425 if (!gst_element_link_pads (GST_ELEMENT (send->outputselector), "src_0",
426 GST_ELEMENT (transport->dtlssrtpenc), "rtcp_sink_0"))
427 g_warn_if_reached ();
428
429 ghost = gst_ghost_pad_new ("rtp_sink", pad);
430 gst_element_add_pad (GST_ELEMENT (send), ghost);
431 gst_object_unref (pad);
432
433 /* push the data stream onto the RTP dtls element */
434 templ = _find_pad_template (transport->dtlssrtpenc,
435 GST_PAD_SINK, GST_PAD_REQUEST, "data_sink");
436 pad = gst_element_request_pad (transport->dtlssrtpenc, templ, "data_sink",
437 NULL);
438
439 ghost = gst_ghost_pad_new ("data_sink", pad);
440 gst_element_add_pad (GST_ELEMENT (send), ghost);
441 gst_object_unref (pad);
442
443 /* RTCP */
444 transport = send->stream->rtcp_transport;
445 /* Do the common init for the context struct */
446 tsb_setup_ctx (send, &send->rtcp_ctx, transport);
447 templ = _find_pad_template (transport->dtlssrtpenc,
448 GST_PAD_SINK, GST_PAD_REQUEST, "rtcp_sink_%d");
449
450 if (!gst_element_link_pads (GST_ELEMENT (send->outputselector), "src_1",
451 GST_ELEMENT (transport->dtlssrtpenc), "rtcp_sink_0"))
452 g_warn_if_reached ();
453
454 pad = gst_element_get_static_pad (send->outputselector, "sink");
455
456 ghost = gst_ghost_pad_new ("rtcp_sink", pad);
457 gst_element_add_pad (GST_ELEMENT (send), ghost);
458 gst_object_unref (pad);
459
460 G_OBJECT_CLASS (parent_class)->constructed (object);
461 }
462
463 static void
cleanup_ctx_blocks(TransportSendBinDTLSContext * ctx)464 cleanup_ctx_blocks (TransportSendBinDTLSContext * ctx)
465 {
466 if (ctx->rtp_block) {
467 _free_pad_block (ctx->rtp_block);
468 ctx->rtp_block = NULL;
469 }
470
471 if (ctx->rtcp_block) {
472 _free_pad_block (ctx->rtcp_block);
473 ctx->rtcp_block = NULL;
474 }
475
476 if (ctx->nice_block) {
477 _free_pad_block (ctx->nice_block);
478 ctx->nice_block = NULL;
479 }
480 }
481
482 static void
cleanup_blocks(TransportSendBin * send)483 cleanup_blocks (TransportSendBin * send)
484 {
485 cleanup_ctx_blocks (&send->rtp_ctx);
486 cleanup_ctx_blocks (&send->rtcp_ctx);
487 }
488
489 static void
transport_send_bin_dispose(GObject * object)490 transport_send_bin_dispose (GObject * object)
491 {
492 TransportSendBin *send = TRANSPORT_SEND_BIN (object);
493
494 TSB_LOCK (send);
495 if (send->rtp_ctx.nicesink) {
496 g_signal_handlers_disconnect_by_data (send->rtp_ctx.nicesink, send);
497 send->rtp_ctx.nicesink = NULL;
498 }
499 if (send->rtcp_ctx.nicesink) {
500 g_signal_handlers_disconnect_by_data (send->rtcp_ctx.nicesink, send);
501 send->rtcp_ctx.nicesink = NULL;
502 }
503 cleanup_blocks (send);
504
505 TSB_UNLOCK (send);
506
507 G_OBJECT_CLASS (parent_class)->dispose (object);
508 }
509
510 static void
transport_send_bin_finalize(GObject * object)511 transport_send_bin_finalize (GObject * object)
512 {
513 TransportSendBin *send = TRANSPORT_SEND_BIN (object);
514
515 g_mutex_clear (TSB_GET_LOCK (send));
516 G_OBJECT_CLASS (parent_class)->finalize (object);
517 }
518
519 static void
transport_send_bin_class_init(TransportSendBinClass * klass)520 transport_send_bin_class_init (TransportSendBinClass * klass)
521 {
522 GObjectClass *gobject_class = (GObjectClass *) klass;
523 GstElementClass *element_class = (GstElementClass *) klass;
524
525 element_class->change_state = transport_send_bin_change_state;
526
527 gst_element_class_add_static_pad_template (element_class, &rtp_sink_template);
528 gst_element_class_add_static_pad_template (element_class,
529 &rtcp_sink_template);
530 gst_element_class_add_static_pad_template (element_class,
531 &data_sink_template);
532
533 gst_element_class_set_metadata (element_class, "WebRTC Transport Send Bin",
534 "Filter/Network/WebRTC", "A bin for webrtc connections",
535 "Matthew Waters <matthew@centricular.com>");
536
537 gobject_class->constructed = transport_send_bin_constructed;
538 gobject_class->dispose = transport_send_bin_dispose;
539 gobject_class->get_property = transport_send_bin_get_property;
540 gobject_class->set_property = transport_send_bin_set_property;
541 gobject_class->finalize = transport_send_bin_finalize;
542
543 g_object_class_install_property (gobject_class,
544 PROP_STREAM,
545 g_param_spec_object ("stream", "Stream",
546 "The TransportStream for this sending bin",
547 transport_stream_get_type (),
548 G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
549
550 g_object_class_install_property (gobject_class,
551 PROP_RTCP_MUX,
552 g_param_spec_boolean ("rtcp-mux", "RTCP Mux",
553 "Whether RTCP packets are muxed with RTP packets",
554 FALSE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
555 }
556
557 static void
transport_send_bin_init(TransportSendBin * send)558 transport_send_bin_init (TransportSendBin * send)
559 {
560 g_mutex_init (TSB_GET_LOCK (send));
561 }
562