1 /* GStreamer
2  * Copyright (C) 2018 Matthew Waters <matthew@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19 
20 /**
21  * SECTION:gstwebrtc-datachannel
22  * @short_description: RTCDataChannel object
23  * @title: GstWebRTCDataChannel
24  * @see_also: #GstWebRTCRTPTransceiver
25  *
26  * <ulink url="http://w3c.github.io/webrtc-pc/#dom-rtcsctptransport">http://w3c.github.io/webrtc-pc/#dom-rtcsctptransport</ulink>
27  */
28 
29 #ifdef HAVE_CONFIG_H
30 # include "config.h"
31 #endif
32 
33 #include "webrtcdatachannel.h"
34 #include <gst/app/gstappsink.h>
35 #include <gst/app/gstappsrc.h>
36 #include <gst/base/gstbytereader.h>
37 #include <gst/base/gstbytewriter.h>
38 #include <gst/sctp/sctpreceivemeta.h>
39 #include <gst/sctp/sctpsendmeta.h>
40 
41 #include "gstwebrtcbin.h"
42 #include "utils.h"
43 
44 #define GST_CAT_DEFAULT gst_webrtc_data_channel_debug
45 GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
46 
47 #define gst_webrtc_data_channel_parent_class parent_class
48 G_DEFINE_TYPE_WITH_CODE (GstWebRTCDataChannel, gst_webrtc_data_channel,
49     GST_TYPE_OBJECT, GST_DEBUG_CATEGORY_INIT (gst_webrtc_data_channel_debug,
50         "webrtcdatachannel", 0, "webrtcdatachannel"););
51 
52 enum
53 {
54   SIGNAL_0,
55   SIGNAL_ON_OPEN,
56   SIGNAL_ON_CLOSE,
57   SIGNAL_ON_ERROR,
58   SIGNAL_ON_MESSAGE_DATA,
59   SIGNAL_ON_MESSAGE_STRING,
60   SIGNAL_ON_BUFFERED_AMOUNT_LOW,
61   SIGNAL_SEND_DATA,
62   SIGNAL_SEND_STRING,
63   SIGNAL_CLOSE,
64   LAST_SIGNAL,
65 };
66 
67 enum
68 {
69   PROP_0,
70   PROP_LABEL,
71   PROP_ORDERED,
72   PROP_MAX_PACKET_LIFETIME,
73   PROP_MAX_RETRANSMITS,
74   PROP_PROTOCOL,
75   PROP_NEGOTIATED,
76   PROP_ID,
77   PROP_PRIORITY,
78   PROP_READY_STATE,
79   PROP_BUFFERED_AMOUNT,
80   PROP_BUFFERED_AMOUNT_LOW_THRESHOLD,
81 };
82 
83 static guint gst_webrtc_data_channel_signals[LAST_SIGNAL] = { 0 };
84 
85 typedef enum
86 {
87   DATA_CHANNEL_PPID_WEBRTC_CONTROL = 50,
88   DATA_CHANNEL_PPID_WEBRTC_STRING = 51,
89   DATA_CHANNEL_PPID_WEBRTC_BINARY_PARTIAL = 52, /* deprecated */
90   DATA_CHANNEL_PPID_WEBRTC_BINARY = 53,
91   DATA_CHANNEL_PPID_WEBRTC_STRING_PARTIAL = 54, /* deprecated */
92   DATA_CHANNEL_PPID_WEBRTC_BINARY_EMPTY = 56,
93   DATA_CHANNEL_PPID_WEBRTC_STRING_EMPTY = 57,
94 } DataChannelPPID;
95 
96 typedef enum
97 {
98   CHANNEL_TYPE_RELIABLE = 0x00,
99   CHANNEL_TYPE_RELIABLE_UNORDERED = 0x80,
100   CHANNEL_TYPE_PARTIAL_RELIABLE_REXMIT = 0x01,
101   CHANNEL_TYPE_PARTIAL_RELIABLE_REXMIT_UNORDERED = 0x81,
102   CHANNEL_TYPE_PARTIAL_RELIABLE_TIMED = 0x02,
103   CHANNEL_TYPE_PARTIAL_RELIABLE_TIMED_UNORDERED = 0x82,
104 } DataChannelReliabilityType;
105 
106 typedef enum
107 {
108   CHANNEL_MESSAGE_ACK = 0x02,
109   CHANNEL_MESSAGE_OPEN = 0x03,
110 } DataChannelMessage;
111 
112 static guint16
priority_type_to_uint(GstWebRTCPriorityType pri)113 priority_type_to_uint (GstWebRTCPriorityType pri)
114 {
115   switch (pri) {
116     case GST_WEBRTC_PRIORITY_TYPE_VERY_LOW:
117       return 64;
118     case GST_WEBRTC_PRIORITY_TYPE_LOW:
119       return 192;
120     case GST_WEBRTC_PRIORITY_TYPE_MEDIUM:
121       return 384;
122     case GST_WEBRTC_PRIORITY_TYPE_HIGH:
123       return 768;
124   }
125   g_assert_not_reached ();
126   return 0;
127 }
128 
129 static GstWebRTCPriorityType
priority_uint_to_type(guint16 val)130 priority_uint_to_type (guint16 val)
131 {
132   if (val <= 128)
133     return GST_WEBRTC_PRIORITY_TYPE_VERY_LOW;
134   if (val <= 256)
135     return GST_WEBRTC_PRIORITY_TYPE_LOW;
136   if (val <= 512)
137     return GST_WEBRTC_PRIORITY_TYPE_MEDIUM;
138   return GST_WEBRTC_PRIORITY_TYPE_HIGH;
139 }
140 
141 static GstBuffer *
construct_open_packet(GstWebRTCDataChannel * channel)142 construct_open_packet (GstWebRTCDataChannel * channel)
143 {
144   GstByteWriter w;
145   gsize label_len = strlen (channel->label);
146   gsize proto_len = strlen (channel->protocol);
147   gsize size = 12 + label_len + proto_len;
148   DataChannelReliabilityType reliability = 0;
149   guint32 reliability_param = 0;
150   guint16 priority;
151   GstBuffer *buf;
152 
153 /*
154  *    0                   1                   2                   3
155  *    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
156  *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
157  *   |  Message Type |  Channel Type |            Priority           |
158  *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
159  *   |                    Reliability Parameter                      |
160  *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
161  *   |         Label Length          |       Protocol Length         |
162  *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
163  *   \                                                               /
164  *   |                             Label                             |
165  *   /                                                               \
166  *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
167  *   \                                                               /
168  *   |                            Protocol                           |
169  *   /                                                               \
170  *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
171  */
172 
173   gst_byte_writer_init_with_size (&w, size, FALSE);
174 
175   if (!gst_byte_writer_put_uint8 (&w, (guint8) CHANNEL_MESSAGE_OPEN))
176     g_return_val_if_reached (NULL);
177 
178   if (!channel->ordered)
179     reliability |= 0x80;
180   if (channel->max_retransmits != -1) {
181     reliability |= 0x01;
182     reliability_param = channel->max_retransmits;
183   }
184   if (channel->max_packet_lifetime != -1) {
185     reliability |= 0x02;
186     reliability_param = channel->max_packet_lifetime;
187   }
188 
189   priority = priority_type_to_uint (channel->priority);
190 
191   if (!gst_byte_writer_put_uint8 (&w, (guint8) reliability))
192     g_return_val_if_reached (NULL);
193   if (!gst_byte_writer_put_uint16_be (&w, (guint16) priority))
194     g_return_val_if_reached (NULL);
195   if (!gst_byte_writer_put_uint32_be (&w, (guint32) reliability_param))
196     g_return_val_if_reached (NULL);
197   if (!gst_byte_writer_put_uint16_be (&w, (guint16) label_len))
198     g_return_val_if_reached (NULL);
199   if (!gst_byte_writer_put_uint16_be (&w, (guint16) proto_len))
200     g_return_val_if_reached (NULL);
201   if (!gst_byte_writer_put_data (&w, (guint8 *) channel->label, label_len))
202     g_return_val_if_reached (NULL);
203   if (!gst_byte_writer_put_data (&w, (guint8 *) channel->protocol, proto_len))
204     g_return_val_if_reached (NULL);
205 
206   buf = gst_byte_writer_reset_and_get_buffer (&w);
207 
208   /* send reliable and ordered */
209   gst_sctp_buffer_add_send_meta (buf, DATA_CHANNEL_PPID_WEBRTC_CONTROL, TRUE,
210       GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE, 0);
211 
212   return buf;
213 }
214 
215 static GstBuffer *
construct_ack_packet(GstWebRTCDataChannel * channel)216 construct_ack_packet (GstWebRTCDataChannel * channel)
217 {
218   GstByteWriter w;
219   GstBuffer *buf;
220 
221 /*
222  *   0                   1                   2                   3
223  *   0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
224  *   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
225  *   |  Message Type |
226  *   +-+-+-+-+-+-+-+-+
227  */
228 
229   gst_byte_writer_init_with_size (&w, 1, FALSE);
230 
231   if (!gst_byte_writer_put_uint8 (&w, (guint8) CHANNEL_MESSAGE_ACK))
232     g_return_val_if_reached (NULL);
233 
234   buf = gst_byte_writer_reset_and_get_buffer (&w);
235 
236   /* send reliable and ordered */
237   gst_sctp_buffer_add_send_meta (buf, DATA_CHANNEL_PPID_WEBRTC_CONTROL, TRUE,
238       GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE, 0);
239 
240   return buf;
241 }
242 
243 typedef void (*ChannelTask) (GstWebRTCDataChannel * channel,
244     gpointer user_data);
245 
246 struct task
247 {
248   GstWebRTCDataChannel *channel;
249   ChannelTask func;
250   gpointer user_data;
251   GDestroyNotify notify;
252 };
253 
254 static void
_execute_task(GstWebRTCBin * webrtc,struct task * task)255 _execute_task (GstWebRTCBin * webrtc, struct task *task)
256 {
257   if (task->func)
258     task->func (task->channel, task->user_data);
259 }
260 
261 static void
_free_task(struct task * task)262 _free_task (struct task *task)
263 {
264   gst_object_unref (task->channel);
265 
266   if (task->notify)
267     task->notify (task->user_data);
268   g_free (task);
269 }
270 
271 static void
_channel_enqueue_task(GstWebRTCDataChannel * channel,ChannelTask func,gpointer user_data,GDestroyNotify notify)272 _channel_enqueue_task (GstWebRTCDataChannel * channel, ChannelTask func,
273     gpointer user_data, GDestroyNotify notify)
274 {
275   struct task *task = g_new0 (struct task, 1);
276 
277   task->channel = gst_object_ref (channel);
278   task->func = func;
279   task->user_data = user_data;
280   task->notify = notify;
281 
282   gst_webrtc_bin_enqueue_task (channel->webrtcbin,
283       (GstWebRTCBinFunc) _execute_task, task, (GDestroyNotify) _free_task);
284 }
285 
286 static void
_channel_store_error(GstWebRTCDataChannel * channel,GError * error)287 _channel_store_error (GstWebRTCDataChannel * channel, GError * error)
288 {
289   GST_OBJECT_LOCK (channel);
290   if (error) {
291     GST_WARNING_OBJECT (channel, "Error: %s",
292         error ? error->message : "Unknown");
293     if (!channel->stored_error)
294       channel->stored_error = error;
295     else
296       g_clear_error (&error);
297   }
298   GST_OBJECT_UNLOCK (channel);
299 }
300 
301 static void
_maybe_emit_on_error(GstWebRTCDataChannel * channel,GError * error)302 _maybe_emit_on_error (GstWebRTCDataChannel * channel, GError * error)
303 {
304   if (error) {
305     GST_WARNING_OBJECT (channel, "error thrown");
306     g_signal_emit (channel, gst_webrtc_data_channel_signals[SIGNAL_ON_ERROR], 0,
307         error);
308   }
309 }
310 
311 static void
_emit_on_open(GstWebRTCDataChannel * channel,gpointer user_data)312 _emit_on_open (GstWebRTCDataChannel * channel, gpointer user_data)
313 {
314   GST_OBJECT_LOCK (channel);
315   if (channel->ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING ||
316       channel->ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_CLOSED) {
317     GST_OBJECT_UNLOCK (channel);
318     return;
319   }
320 
321   if (channel->ready_state != GST_WEBRTC_DATA_CHANNEL_STATE_OPEN) {
322     channel->ready_state = GST_WEBRTC_DATA_CHANNEL_STATE_OPEN;
323     GST_OBJECT_UNLOCK (channel);
324     g_object_notify (G_OBJECT (channel), "ready-state");
325 
326     GST_INFO_OBJECT (channel, "We are open and ready for data!");
327     g_signal_emit (channel, gst_webrtc_data_channel_signals[SIGNAL_ON_OPEN], 0,
328         NULL);
329   } else {
330     GST_OBJECT_UNLOCK (channel);
331   }
332 }
333 
334 static void
_transport_closed_unlocked(GstWebRTCDataChannel * channel)335 _transport_closed_unlocked (GstWebRTCDataChannel * channel)
336 {
337   GError *error;
338 
339   if (channel->ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_CLOSED)
340     return;
341 
342   channel->ready_state = GST_WEBRTC_DATA_CHANNEL_STATE_CLOSED;
343 
344   error = channel->stored_error;
345   channel->stored_error = NULL;
346   GST_OBJECT_UNLOCK (channel);
347 
348   g_object_notify (G_OBJECT (channel), "ready-state");
349   GST_INFO_OBJECT (channel, "We are closed for data");
350 
351   _maybe_emit_on_error (channel, error);
352 
353   g_signal_emit (channel, gst_webrtc_data_channel_signals[SIGNAL_ON_CLOSE], 0,
354       NULL);
355   GST_OBJECT_LOCK (channel);
356 }
357 
358 static void
_transport_closed(GstWebRTCDataChannel * channel,gpointer user_data)359 _transport_closed (GstWebRTCDataChannel * channel, gpointer user_data)
360 {
361   GST_OBJECT_LOCK (channel);
362   _transport_closed_unlocked (channel);
363   GST_OBJECT_UNLOCK (channel);
364 }
365 
366 static void
_close_sctp_stream(GstWebRTCDataChannel * channel,gpointer user_data)367 _close_sctp_stream (GstWebRTCDataChannel * channel, gpointer user_data)
368 {
369   GstPad *pad, *peer;
370 
371   pad = gst_element_get_static_pad (channel->appsrc, "src");
372   peer = gst_pad_get_peer (pad);
373   gst_object_unref (pad);
374 
375   if (peer) {
376     GstElement *sctpenc = gst_pad_get_parent_element (peer);
377 
378     if (sctpenc) {
379       gst_element_release_request_pad (sctpenc, peer);
380       gst_object_unref (sctpenc);
381     }
382     gst_object_unref (peer);
383   }
384 
385   _transport_closed (channel, NULL);
386 }
387 
388 static void
_close_procedure(GstWebRTCDataChannel * channel,gpointer user_data)389 _close_procedure (GstWebRTCDataChannel * channel, gpointer user_data)
390 {
391   /* https://www.w3.org/TR/webrtc/#data-transport-closing-procedure */
392   GST_OBJECT_LOCK (channel);
393   if (channel->ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_CLOSED
394       || channel->ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING) {
395     GST_OBJECT_UNLOCK (channel);
396     return;
397   }
398   channel->ready_state = GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING;
399   GST_OBJECT_UNLOCK (channel);
400   g_object_notify (G_OBJECT (channel), "ready-state");
401 
402   GST_OBJECT_LOCK (channel);
403   if (channel->buffered_amount <= 0) {
404     _channel_enqueue_task (channel, (ChannelTask) _close_sctp_stream,
405         NULL, NULL);
406   }
407 
408   GST_OBJECT_UNLOCK (channel);
409 }
410 
411 static void
_on_sctp_reset_stream(GstWebRTCSCTPTransport * sctp,guint stream_id,GstWebRTCDataChannel * channel)412 _on_sctp_reset_stream (GstWebRTCSCTPTransport * sctp, guint stream_id,
413     GstWebRTCDataChannel * channel)
414 {
415   if (channel->id == stream_id)
416     _channel_enqueue_task (channel, (ChannelTask) _transport_closed,
417         GUINT_TO_POINTER (stream_id), NULL);
418 }
419 
420 static void
gst_webrtc_data_channel_close(GstWebRTCDataChannel * channel)421 gst_webrtc_data_channel_close (GstWebRTCDataChannel * channel)
422 {
423   _close_procedure (channel, NULL);
424 }
425 
426 static GstFlowReturn
_parse_control_packet(GstWebRTCDataChannel * channel,guint8 * data,gsize size,GError ** error)427 _parse_control_packet (GstWebRTCDataChannel * channel, guint8 * data,
428     gsize size, GError ** error)
429 {
430   GstByteReader r;
431   guint8 message_type;
432 
433   if (!data)
434     g_return_val_if_reached (GST_FLOW_ERROR);
435   if (size < 1)
436     g_return_val_if_reached (GST_FLOW_ERROR);
437 
438   gst_byte_reader_init (&r, data, size);
439 
440   if (!gst_byte_reader_get_uint8 (&r, &message_type))
441     g_return_val_if_reached (GST_FLOW_ERROR);
442 
443   if (message_type == CHANNEL_MESSAGE_ACK) {
444     /* all good */
445     GST_INFO_OBJECT (channel, "Received channel ack");
446     return GST_FLOW_OK;
447   } else if (message_type == CHANNEL_MESSAGE_OPEN) {
448     guint8 reliability;
449     guint32 reliability_param;
450     guint16 priority, label_len, proto_len;
451     const guint8 *src;
452     gchar *label, *proto;
453     GstBuffer *buffer;
454     GstFlowReturn ret;
455 
456     GST_INFO_OBJECT (channel, "Received channel open");
457 
458     if (channel->negotiated) {
459       g_set_error (error, GST_WEBRTC_BIN_ERROR,
460           GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
461           "Data channel was signalled as negotiated already");
462       g_return_val_if_reached (GST_FLOW_ERROR);
463     }
464 
465     if (channel->opened)
466       return GST_FLOW_OK;
467 
468     if (!gst_byte_reader_get_uint8 (&r, &reliability))
469       goto parse_error;
470     if (!gst_byte_reader_get_uint16_be (&r, &priority))
471       goto parse_error;
472     if (!gst_byte_reader_get_uint32_be (&r, &reliability_param))
473       goto parse_error;
474     if (!gst_byte_reader_get_uint16_be (&r, &label_len))
475       goto parse_error;
476     if (!gst_byte_reader_get_uint16_be (&r, &proto_len))
477       goto parse_error;
478 
479     label = g_new0 (gchar, (gsize) label_len + 1);
480     proto = g_new0 (gchar, (gsize) proto_len + 1);
481 
482     if (!gst_byte_reader_get_data (&r, label_len, &src))
483       goto parse_error;
484     memcpy (label, src, label_len);
485     label[label_len] = '\0';
486     if (!gst_byte_reader_get_data (&r, proto_len, &src))
487       goto parse_error;
488     memcpy (proto, src, proto_len);
489     proto[proto_len] = '\0';
490 
491     channel->label = label;
492     channel->protocol = proto;
493     channel->priority = priority_uint_to_type (priority);
494     channel->ordered = !(reliability & 0x80);
495     if (reliability & 0x01) {
496       channel->max_retransmits = reliability_param;
497       channel->max_packet_lifetime = -1;
498     } else if (reliability & 0x02) {
499       channel->max_retransmits = -1;
500       channel->max_packet_lifetime = reliability_param;
501     } else {
502       channel->max_retransmits = -1;
503       channel->max_packet_lifetime = -1;
504     }
505     channel->opened = TRUE;
506 
507     GST_INFO_OBJECT (channel, "Received channel open for SCTP stream %i "
508         "label %s protocol %s ordered %s", channel->id, channel->label,
509         channel->protocol, channel->ordered ? "true" : "false");
510 
511     _channel_enqueue_task (channel, (ChannelTask) _emit_on_open, NULL, NULL);
512 
513     GST_INFO_OBJECT (channel, "Sending channel ack");
514     buffer = construct_ack_packet (channel);
515 
516     GST_OBJECT_LOCK (channel);
517     channel->buffered_amount += gst_buffer_get_size (buffer);
518     GST_OBJECT_UNLOCK (channel);
519 
520     ret = gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc), buffer);
521     if (ret != GST_FLOW_OK) {
522       g_set_error (error, GST_WEBRTC_BIN_ERROR,
523           GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
524           "Could not send ack packet");
525     }
526     return ret;
527   } else {
528     g_set_error (error, GST_WEBRTC_BIN_ERROR,
529         GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
530         "Unknown message type in control protocol");
531     return GST_FLOW_ERROR;
532   }
533 
534 parse_error:
535   {
536     g_set_error (error, GST_WEBRTC_BIN_ERROR,
537         GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, "Failed to parse packet");
538     g_return_val_if_reached (GST_FLOW_ERROR);
539   }
540 }
541 
542 static void
on_sink_eos(GstAppSink * sink,gpointer user_data)543 on_sink_eos (GstAppSink * sink, gpointer user_data)
544 {
545 }
546 
547 struct map_info
548 {
549   GstBuffer *buffer;
550   GstMapInfo map_info;
551 };
552 
553 static void
buffer_unmap_and_unref(struct map_info * info)554 buffer_unmap_and_unref (struct map_info *info)
555 {
556   gst_buffer_unmap (info->buffer, &info->map_info);
557   gst_buffer_unref (info->buffer);
558   g_free (info);
559 }
560 
561 static void
_emit_have_data(GstWebRTCDataChannel * channel,GBytes * data)562 _emit_have_data (GstWebRTCDataChannel * channel, GBytes * data)
563 {
564   GST_LOG_OBJECT (channel, "Have data %p", data);
565   g_signal_emit (channel,
566       gst_webrtc_data_channel_signals[SIGNAL_ON_MESSAGE_DATA], 0, data);
567 }
568 
569 static void
_emit_have_string(GstWebRTCDataChannel * channel,gchar * str)570 _emit_have_string (GstWebRTCDataChannel * channel, gchar * str)
571 {
572   GST_LOG_OBJECT (channel, "Have string %p", str);
573   g_signal_emit (channel,
574       gst_webrtc_data_channel_signals[SIGNAL_ON_MESSAGE_STRING], 0, str);
575 }
576 
577 static GstFlowReturn
_data_channel_have_sample(GstWebRTCDataChannel * channel,GstSample * sample,GError ** error)578 _data_channel_have_sample (GstWebRTCDataChannel * channel, GstSample * sample,
579     GError ** error)
580 {
581   GstSctpReceiveMeta *receive;
582   GstBuffer *buffer;
583   GstFlowReturn ret = GST_FLOW_OK;
584 
585   GST_LOG_OBJECT (channel, "Received sample %" GST_PTR_FORMAT, sample);
586 
587   g_return_val_if_fail (channel->sctp_transport != NULL, GST_FLOW_ERROR);
588 
589   buffer = gst_sample_get_buffer (sample);
590   if (!buffer) {
591     g_set_error (error, GST_WEBRTC_BIN_ERROR,
592         GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, "No buffer to handle");
593     return GST_FLOW_ERROR;
594   }
595   receive = gst_sctp_buffer_get_receive_meta (buffer);
596   if (!receive) {
597     g_set_error (error, GST_WEBRTC_BIN_ERROR,
598         GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
599         "No SCTP Receive meta on the buffer");
600     return GST_FLOW_ERROR;
601   }
602 
603   switch (receive->ppid) {
604     case DATA_CHANNEL_PPID_WEBRTC_CONTROL:{
605       GstMapInfo info = GST_MAP_INFO_INIT;
606       if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) {
607         g_set_error (error, GST_WEBRTC_BIN_ERROR,
608             GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
609             "Failed to map received buffer");
610         ret = GST_FLOW_ERROR;
611       } else {
612         ret = _parse_control_packet (channel, info.data, info.size, error);
613       }
614       break;
615     }
616     case DATA_CHANNEL_PPID_WEBRTC_STRING:
617     case DATA_CHANNEL_PPID_WEBRTC_STRING_PARTIAL:{
618       GstMapInfo info = GST_MAP_INFO_INIT;
619       if (!gst_buffer_map (buffer, &info, GST_MAP_READ)) {
620         g_set_error (error, GST_WEBRTC_BIN_ERROR,
621             GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
622             "Failed to map received buffer");
623         ret = GST_FLOW_ERROR;
624       } else {
625         gchar *str = g_strndup ((gchar *) info.data, info.size);
626         _channel_enqueue_task (channel, (ChannelTask) _emit_have_string, str,
627             g_free);
628       }
629       break;
630     }
631     case DATA_CHANNEL_PPID_WEBRTC_BINARY:
632     case DATA_CHANNEL_PPID_WEBRTC_BINARY_PARTIAL:{
633       struct map_info *info = g_new0 (struct map_info, 1);
634       if (!gst_buffer_map (buffer, &info->map_info, GST_MAP_READ)) {
635         g_set_error (error, GST_WEBRTC_BIN_ERROR,
636             GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
637             "Failed to map received buffer");
638         ret = GST_FLOW_ERROR;
639       } else {
640         GBytes *data = g_bytes_new_with_free_func (info->map_info.data,
641             info->map_info.size, (GDestroyNotify) buffer_unmap_and_unref, info);
642         info->buffer = gst_buffer_ref (buffer);
643         _channel_enqueue_task (channel, (ChannelTask) _emit_have_data, data,
644             (GDestroyNotify) g_bytes_unref);
645       }
646       break;
647     }
648     case DATA_CHANNEL_PPID_WEBRTC_BINARY_EMPTY:
649       _channel_enqueue_task (channel, (ChannelTask) _emit_have_data, NULL,
650           NULL);
651       break;
652     case DATA_CHANNEL_PPID_WEBRTC_STRING_EMPTY:
653       _channel_enqueue_task (channel, (ChannelTask) _emit_have_string, NULL,
654           NULL);
655       break;
656     default:
657       g_set_error (error, GST_WEBRTC_BIN_ERROR,
658           GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
659           "Unknown SCTP PPID %u received", receive->ppid);
660       ret = GST_FLOW_ERROR;
661       break;
662   }
663 
664   return ret;
665 }
666 
667 static GstFlowReturn
on_sink_preroll(GstAppSink * sink,gpointer user_data)668 on_sink_preroll (GstAppSink * sink, gpointer user_data)
669 {
670   GstWebRTCDataChannel *channel = user_data;
671   GstSample *sample = gst_app_sink_pull_preroll (sink);
672   GstFlowReturn ret;
673 
674   if (sample) {
675     /* This sample also seems to be provided by the sample callback
676        ret = _data_channel_have_sample (channel, sample); */
677     ret = GST_FLOW_OK;
678     gst_sample_unref (sample);
679   } else if (gst_app_sink_is_eos (sink)) {
680     ret = GST_FLOW_EOS;
681   } else {
682     ret = GST_FLOW_ERROR;
683   }
684 
685   if (ret != GST_FLOW_OK) {
686     _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
687   }
688 
689   return ret;
690 }
691 
692 static GstFlowReturn
on_sink_sample(GstAppSink * sink,gpointer user_data)693 on_sink_sample (GstAppSink * sink, gpointer user_data)
694 {
695   GstWebRTCDataChannel *channel = user_data;
696   GstSample *sample = gst_app_sink_pull_sample (sink);
697   GstFlowReturn ret;
698   GError *error = NULL;
699 
700   if (sample) {
701     ret = _data_channel_have_sample (channel, sample, &error);
702     gst_sample_unref (sample);
703   } else if (gst_app_sink_is_eos (sink)) {
704     ret = GST_FLOW_EOS;
705   } else {
706     ret = GST_FLOW_ERROR;
707   }
708 
709   if (error)
710     _channel_store_error (channel, error);
711 
712   if (ret != GST_FLOW_OK) {
713     _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
714   }
715 
716   return ret;
717 }
718 
719 static GstAppSinkCallbacks sink_callbacks = {
720   on_sink_eos,
721   on_sink_preroll,
722   on_sink_sample,
723 };
724 
725 void
gst_webrtc_data_channel_start_negotiation(GstWebRTCDataChannel * channel)726 gst_webrtc_data_channel_start_negotiation (GstWebRTCDataChannel * channel)
727 {
728   GstBuffer *buffer;
729 
730   g_return_if_fail (!channel->negotiated);
731   g_return_if_fail (channel->id != -1);
732   g_return_if_fail (channel->sctp_transport != NULL);
733 
734   buffer = construct_open_packet (channel);
735 
736   GST_INFO_OBJECT (channel, "Sending channel open for SCTP stream %i "
737       "label %s protocol %s ordered %s", channel->id, channel->label,
738       channel->protocol, channel->ordered ? "true" : "false");
739 
740   GST_OBJECT_LOCK (channel);
741   channel->buffered_amount += gst_buffer_get_size (buffer);
742   GST_OBJECT_UNLOCK (channel);
743 
744   if (gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc),
745           buffer) == GST_FLOW_OK) {
746     channel->opened = TRUE;
747     _channel_enqueue_task (channel, (ChannelTask) _emit_on_open, NULL, NULL);
748   } else {
749     GError *error = NULL;
750     g_set_error (&error, GST_WEBRTC_BIN_ERROR,
751         GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
752         "Failed to send DCEP open packet");
753     _channel_store_error (channel, error);
754     _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
755   }
756 }
757 
758 static void
_get_sctp_reliability(GstWebRTCDataChannel * channel,GstSctpSendMetaPartiallyReliability * reliability,guint * rel_param)759 _get_sctp_reliability (GstWebRTCDataChannel * channel,
760     GstSctpSendMetaPartiallyReliability * reliability, guint * rel_param)
761 {
762   if (channel->max_retransmits != -1) {
763     *reliability = GST_SCTP_SEND_META_PARTIAL_RELIABILITY_RTX;
764     *rel_param = channel->max_retransmits;
765   } else if (channel->max_packet_lifetime != -1) {
766     *reliability = GST_SCTP_SEND_META_PARTIAL_RELIABILITY_TTL;
767     *rel_param = channel->max_packet_lifetime;
768   } else {
769     *reliability = GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE;
770     *rel_param = 0;
771   }
772 }
773 
774 static gboolean
_is_within_max_message_size(GstWebRTCDataChannel * channel,gsize size)775 _is_within_max_message_size (GstWebRTCDataChannel * channel, gsize size)
776 {
777   return size <= channel->sctp_transport->max_message_size;
778 }
779 
780 static void
gst_webrtc_data_channel_send_data(GstWebRTCDataChannel * channel,GBytes * bytes)781 gst_webrtc_data_channel_send_data (GstWebRTCDataChannel * channel,
782     GBytes * bytes)
783 {
784   GstSctpSendMetaPartiallyReliability reliability;
785   guint rel_param;
786   guint32 ppid;
787   GstBuffer *buffer;
788   GstFlowReturn ret;
789 
790   if (!bytes) {
791     buffer = gst_buffer_new ();
792     ppid = DATA_CHANNEL_PPID_WEBRTC_BINARY_EMPTY;
793   } else {
794     gsize size;
795     guint8 *data;
796 
797     data = (guint8 *) g_bytes_get_data (bytes, &size);
798     g_return_if_fail (data != NULL);
799     if (!_is_within_max_message_size (channel, size)) {
800       GError *error = NULL;
801       g_set_error (&error, GST_WEBRTC_BIN_ERROR,
802           GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
803           "Requested to send data that is too large");
804       _channel_store_error (channel, error);
805       _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL,
806           NULL);
807       return;
808     }
809 
810     buffer = gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY, data, size,
811         0, size, g_bytes_ref (bytes), (GDestroyNotify) g_bytes_unref);
812     ppid = DATA_CHANNEL_PPID_WEBRTC_BINARY;
813   }
814 
815   _get_sctp_reliability (channel, &reliability, &rel_param);
816   gst_sctp_buffer_add_send_meta (buffer, ppid, channel->ordered, reliability,
817       rel_param);
818 
819   GST_LOG_OBJECT (channel, "Sending data using buffer %" GST_PTR_FORMAT,
820       buffer);
821 
822   GST_OBJECT_LOCK (channel);
823   channel->buffered_amount += gst_buffer_get_size (buffer);
824   GST_OBJECT_UNLOCK (channel);
825 
826   ret = gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc), buffer);
827 
828   if (ret != GST_FLOW_OK) {
829     GError *error = NULL;
830     g_set_error (&error, GST_WEBRTC_BIN_ERROR,
831         GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, "Failed to send data");
832     _channel_store_error (channel, error);
833     _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
834   }
835 }
836 
837 static void
gst_webrtc_data_channel_send_string(GstWebRTCDataChannel * channel,gchar * str)838 gst_webrtc_data_channel_send_string (GstWebRTCDataChannel * channel,
839     gchar * str)
840 {
841   GstSctpSendMetaPartiallyReliability reliability;
842   guint rel_param;
843   guint32 ppid;
844   GstBuffer *buffer;
845   GstFlowReturn ret;
846 
847   if (!channel->negotiated)
848     g_return_if_fail (channel->opened);
849   g_return_if_fail (channel->sctp_transport != NULL);
850 
851   if (!str) {
852     buffer = gst_buffer_new ();
853     ppid = DATA_CHANNEL_PPID_WEBRTC_STRING_EMPTY;
854   } else {
855     gsize size = strlen (str);
856     gchar *str_copy = g_strdup (str);
857 
858     if (!_is_within_max_message_size (channel, size)) {
859       GError *error = NULL;
860       g_set_error (&error, GST_WEBRTC_BIN_ERROR,
861           GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE,
862           "Requested to send a string that is too large");
863       _channel_store_error (channel, error);
864       _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL,
865           NULL);
866       return;
867     }
868 
869     buffer =
870         gst_buffer_new_wrapped_full (GST_MEMORY_FLAG_READONLY, str_copy,
871         size, 0, size, str_copy, g_free);
872     ppid = DATA_CHANNEL_PPID_WEBRTC_STRING;
873   }
874 
875   _get_sctp_reliability (channel, &reliability, &rel_param);
876   gst_sctp_buffer_add_send_meta (buffer, ppid, channel->ordered, reliability,
877       rel_param);
878 
879   GST_TRACE_OBJECT (channel, "Sending string using buffer %" GST_PTR_FORMAT,
880       buffer);
881 
882   GST_OBJECT_LOCK (channel);
883   channel->buffered_amount += gst_buffer_get_size (buffer);
884   GST_OBJECT_UNLOCK (channel);
885 
886   ret = gst_app_src_push_buffer (GST_APP_SRC (channel->appsrc), buffer);
887 
888   if (ret != GST_FLOW_OK) {
889     GError *error = NULL;
890     g_set_error (&error, GST_WEBRTC_BIN_ERROR,
891         GST_WEBRTC_BIN_ERROR_DATA_CHANNEL_FAILURE, "Failed to send string");
892     _channel_store_error (channel, error);
893     _channel_enqueue_task (channel, (ChannelTask) _close_procedure, NULL, NULL);
894   }
895 }
896 
897 static void
_on_sctp_notify_state_unlocked(GObject * sctp_transport,GstWebRTCDataChannel * channel)898 _on_sctp_notify_state_unlocked (GObject * sctp_transport,
899     GstWebRTCDataChannel * channel)
900 {
901   GstWebRTCSCTPTransportState state;
902 
903   g_object_get (sctp_transport, "state", &state, NULL);
904   if (state == GST_WEBRTC_SCTP_TRANSPORT_STATE_CONNECTED) {
905     if (channel->negotiated)
906       _channel_enqueue_task (channel, (ChannelTask) _emit_on_open, NULL, NULL);
907   }
908 }
909 
910 static void
_on_sctp_notify_state(GObject * sctp_transport,GParamSpec * pspec,GstWebRTCDataChannel * channel)911 _on_sctp_notify_state (GObject * sctp_transport, GParamSpec * pspec,
912     GstWebRTCDataChannel * channel)
913 {
914   GST_OBJECT_LOCK (channel);
915   _on_sctp_notify_state_unlocked (sctp_transport, channel);
916   GST_OBJECT_UNLOCK (channel);
917 }
918 
919 void
gst_webrtc_data_channel_set_sctp_transport(GstWebRTCDataChannel * channel,GstWebRTCSCTPTransport * sctp)920 gst_webrtc_data_channel_set_sctp_transport (GstWebRTCDataChannel * channel,
921     GstWebRTCSCTPTransport * sctp)
922 {
923   g_return_if_fail (GST_IS_WEBRTC_DATA_CHANNEL (channel));
924   g_return_if_fail (GST_IS_WEBRTC_SCTP_TRANSPORT (sctp));
925 
926   GST_OBJECT_LOCK (channel);
927   if (channel->sctp_transport)
928     g_signal_handlers_disconnect_by_data (channel->sctp_transport, channel);
929 
930   gst_object_replace ((GstObject **) & channel->sctp_transport,
931       GST_OBJECT (sctp));
932 
933   if (sctp) {
934     g_signal_connect (sctp, "stream-reset", G_CALLBACK (_on_sctp_reset_stream),
935         channel);
936     g_signal_connect (sctp, "notify::state", G_CALLBACK (_on_sctp_notify_state),
937         channel);
938     _on_sctp_notify_state_unlocked (G_OBJECT (sctp), channel);
939   }
940   GST_OBJECT_UNLOCK (channel);
941 }
942 
943 static void
gst_webrtc_data_channel_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)944 gst_webrtc_data_channel_set_property (GObject * object, guint prop_id,
945     const GValue * value, GParamSpec * pspec)
946 {
947   GstWebRTCDataChannel *channel = GST_WEBRTC_DATA_CHANNEL (object);
948 
949   GST_OBJECT_LOCK (channel);
950   switch (prop_id) {
951     case PROP_LABEL:
952       channel->label = g_value_dup_string (value);
953       break;
954     case PROP_ORDERED:
955       channel->ordered = g_value_get_boolean (value);
956       break;
957     case PROP_MAX_PACKET_LIFETIME:
958       channel->max_packet_lifetime = g_value_get_int (value);
959       break;
960     case PROP_MAX_RETRANSMITS:
961       channel->max_retransmits = g_value_get_int (value);
962       break;
963     case PROP_PROTOCOL:
964       channel->protocol = g_value_dup_string (value);
965       break;
966     case PROP_NEGOTIATED:
967       channel->negotiated = g_value_get_boolean (value);
968       break;
969     case PROP_ID:
970       channel->id = g_value_get_int (value);
971       break;
972     case PROP_PRIORITY:
973       channel->priority = g_value_get_enum (value);
974       break;
975     case PROP_BUFFERED_AMOUNT_LOW_THRESHOLD:
976       channel->buffered_amount_low_threshold = g_value_get_uint64 (value);
977       break;
978     default:
979       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
980       break;
981   }
982   GST_OBJECT_UNLOCK (channel);
983 }
984 
985 static void
gst_webrtc_data_channel_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)986 gst_webrtc_data_channel_get_property (GObject * object, guint prop_id,
987     GValue * value, GParamSpec * pspec)
988 {
989   GstWebRTCDataChannel *channel = GST_WEBRTC_DATA_CHANNEL (object);
990 
991   GST_OBJECT_LOCK (channel);
992   switch (prop_id) {
993     case PROP_LABEL:
994       g_value_set_string (value, channel->label);
995       break;
996     case PROP_ORDERED:
997       g_value_set_boolean (value, channel->ordered);
998       break;
999     case PROP_MAX_PACKET_LIFETIME:
1000       g_value_set_int (value, channel->max_packet_lifetime);
1001       break;
1002     case PROP_MAX_RETRANSMITS:
1003       g_value_set_int (value, channel->max_retransmits);
1004       break;
1005     case PROP_PROTOCOL:
1006       g_value_set_string (value, channel->protocol);
1007       break;
1008     case PROP_NEGOTIATED:
1009       g_value_set_boolean (value, channel->negotiated);
1010       break;
1011     case PROP_ID:
1012       g_value_set_int (value, channel->id);
1013       break;
1014     case PROP_PRIORITY:
1015       g_value_set_enum (value, channel->priority);
1016       break;
1017     case PROP_READY_STATE:
1018       g_value_set_enum (value, channel->ready_state);
1019       break;
1020     case PROP_BUFFERED_AMOUNT:
1021       g_value_set_uint64 (value, channel->buffered_amount);
1022       break;
1023     case PROP_BUFFERED_AMOUNT_LOW_THRESHOLD:
1024       g_value_set_uint64 (value, channel->buffered_amount_low_threshold);
1025       break;
1026     default:
1027       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1028       break;
1029   }
1030   GST_OBJECT_UNLOCK (channel);
1031 }
1032 
1033 static void
_emit_low_threshold(GstWebRTCDataChannel * channel,gpointer user_data)1034 _emit_low_threshold (GstWebRTCDataChannel * channel, gpointer user_data)
1035 {
1036   GST_LOG_OBJECT (channel, "Low threshold reached");
1037   g_signal_emit (channel,
1038       gst_webrtc_data_channel_signals[SIGNAL_ON_BUFFERED_AMOUNT_LOW], 0);
1039 }
1040 
1041 static GstPadProbeReturn
on_appsrc_data(GstPad * pad,GstPadProbeInfo * info,gpointer user_data)1042 on_appsrc_data (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1043 {
1044   GstWebRTCDataChannel *channel = user_data;
1045   guint64 prev_amount;
1046   guint64 size = 0;
1047 
1048   if (GST_PAD_PROBE_INFO_TYPE (info) & (GST_PAD_PROBE_TYPE_BUFFER)) {
1049     GstBuffer *buffer = GST_PAD_PROBE_INFO_BUFFER (info);
1050     size = gst_buffer_get_size (buffer);
1051   } else if (GST_PAD_PROBE_INFO_TYPE (info) & GST_PAD_PROBE_TYPE_BUFFER_LIST) {
1052     GstBufferList *list = GST_PAD_PROBE_INFO_BUFFER_LIST (info);
1053     size = gst_buffer_list_calculate_size (list);
1054   }
1055 
1056   if (size > 0) {
1057     GST_OBJECT_LOCK (channel);
1058     prev_amount = channel->buffered_amount;
1059     channel->buffered_amount -= size;
1060     if (prev_amount > channel->buffered_amount_low_threshold &&
1061         channel->buffered_amount < channel->buffered_amount_low_threshold) {
1062       _channel_enqueue_task (channel, (ChannelTask) _emit_low_threshold,
1063           NULL, NULL);
1064     }
1065 
1066     if (channel->ready_state == GST_WEBRTC_DATA_CHANNEL_STATE_CLOSING
1067         && channel->buffered_amount <= 0) {
1068       _channel_enqueue_task (channel, (ChannelTask) _close_sctp_stream, NULL,
1069           NULL);
1070     }
1071     GST_OBJECT_UNLOCK (channel);
1072   }
1073 
1074   return GST_PAD_PROBE_OK;
1075 }
1076 
1077 static void
gst_webrtc_data_channel_constructed(GObject * object)1078 gst_webrtc_data_channel_constructed (GObject * object)
1079 {
1080   GstWebRTCDataChannel *channel = GST_WEBRTC_DATA_CHANNEL (object);
1081   GstPad *pad;
1082   GstCaps *caps;
1083 
1084   caps = gst_caps_new_any ();
1085 
1086   channel->appsrc = gst_element_factory_make ("appsrc", NULL);
1087   gst_object_ref_sink (channel->appsrc);
1088   pad = gst_element_get_static_pad (channel->appsrc, "src");
1089 
1090   channel->src_probe = gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_DATA_BOTH,
1091       (GstPadProbeCallback) on_appsrc_data, channel, NULL);
1092 
1093   channel->appsink = gst_element_factory_make ("appsink", NULL);
1094   gst_object_ref_sink (channel->appsink);
1095   g_object_set (channel->appsink, "sync", FALSE, "async", FALSE, "caps", caps,
1096       NULL);
1097   gst_app_sink_set_callbacks (GST_APP_SINK (channel->appsink), &sink_callbacks,
1098       channel, NULL);
1099 
1100   gst_object_unref (pad);
1101   gst_caps_unref (caps);
1102 }
1103 
1104 static void
gst_webrtc_data_channel_finalize(GObject * object)1105 gst_webrtc_data_channel_finalize (GObject * object)
1106 {
1107   GstWebRTCDataChannel *channel = GST_WEBRTC_DATA_CHANNEL (object);
1108 
1109   if (channel->src_probe) {
1110     GstPad *pad = gst_element_get_static_pad (channel->appsrc, "src");
1111     gst_pad_remove_probe (pad, channel->src_probe);
1112     gst_object_unref (pad);
1113     channel->src_probe = 0;
1114   }
1115 
1116   g_free (channel->label);
1117   channel->label = NULL;
1118 
1119   g_free (channel->protocol);
1120   channel->protocol = NULL;
1121 
1122   if (channel->sctp_transport)
1123     g_signal_handlers_disconnect_by_data (channel->sctp_transport, channel);
1124   g_clear_object (&channel->sctp_transport);
1125 
1126   g_clear_object (&channel->appsrc);
1127   g_clear_object (&channel->appsink);
1128 
1129   G_OBJECT_CLASS (parent_class)->finalize (object);
1130 }
1131 
1132 static void
gst_webrtc_data_channel_class_init(GstWebRTCDataChannelClass * klass)1133 gst_webrtc_data_channel_class_init (GstWebRTCDataChannelClass * klass)
1134 {
1135   GObjectClass *gobject_class = (GObjectClass *) klass;
1136 
1137   gobject_class->constructed = gst_webrtc_data_channel_constructed;
1138   gobject_class->get_property = gst_webrtc_data_channel_get_property;
1139   gobject_class->set_property = gst_webrtc_data_channel_set_property;
1140   gobject_class->finalize = gst_webrtc_data_channel_finalize;
1141 
1142   g_object_class_install_property (gobject_class,
1143       PROP_LABEL,
1144       g_param_spec_string ("label",
1145           "Label", "Data channel label",
1146           NULL,
1147           G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
1148 
1149   g_object_class_install_property (gobject_class,
1150       PROP_ORDERED,
1151       g_param_spec_boolean ("ordered",
1152           "Ordered", "Using ordered transmission mode",
1153           FALSE,
1154           G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
1155 
1156   g_object_class_install_property (gobject_class,
1157       PROP_MAX_PACKET_LIFETIME,
1158       g_param_spec_int ("max-packet-lifetime",
1159           "Maximum Packet Lifetime",
1160           "Maximum number of milliseconds that transmissions and "
1161           "retransmissions may occur in unreliable mode (-1 = unset)",
1162           -1, G_MAXUINT16, -1,
1163           G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
1164 
1165   g_object_class_install_property (gobject_class,
1166       PROP_MAX_RETRANSMITS,
1167       g_param_spec_int ("max-retransmits",
1168           "Maximum Retransmits",
1169           "Maximum number of retransmissions attempted in unreliable mode",
1170           -1, G_MAXUINT16, 0,
1171           G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
1172 
1173   g_object_class_install_property (gobject_class,
1174       PROP_PROTOCOL,
1175       g_param_spec_string ("protocol",
1176           "Protocol", "Data channel protocol",
1177           "",
1178           G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
1179 
1180   g_object_class_install_property (gobject_class,
1181       PROP_NEGOTIATED,
1182       g_param_spec_boolean ("negotiated",
1183           "Negotiated",
1184           "Whether this data channel was negotiated by the application", FALSE,
1185           G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
1186 
1187   g_object_class_install_property (gobject_class,
1188       PROP_ID,
1189       g_param_spec_int ("id",
1190           "ID",
1191           "ID negotiated by this data channel (-1 = unset)",
1192           -1, G_MAXUINT16, -1,
1193           G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
1194 
1195   g_object_class_install_property (gobject_class,
1196       PROP_PRIORITY,
1197       g_param_spec_enum ("priority",
1198           "Priority",
1199           "The priority of data sent using this data channel",
1200           GST_TYPE_WEBRTC_PRIORITY_TYPE,
1201           GST_WEBRTC_PRIORITY_TYPE_LOW,
1202           G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
1203 
1204   g_object_class_install_property (gobject_class,
1205       PROP_READY_STATE,
1206       g_param_spec_enum ("ready-state",
1207           "Ready State",
1208           "The Ready state of this data channel",
1209           GST_TYPE_WEBRTC_DATA_CHANNEL_STATE,
1210           GST_WEBRTC_DATA_CHANNEL_STATE_NEW,
1211           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
1212 
1213   g_object_class_install_property (gobject_class,
1214       PROP_BUFFERED_AMOUNT,
1215       g_param_spec_uint64 ("buffered-amount",
1216           "Buffered Amount",
1217           "The amount of data in bytes currently buffered",
1218           0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
1219 
1220   g_object_class_install_property (gobject_class,
1221       PROP_BUFFERED_AMOUNT_LOW_THRESHOLD,
1222       g_param_spec_uint64 ("buffered-amount-low-threshold",
1223           "Buffered Amount Low Threshold",
1224           "The threshold at which the buffered amount is considered low and "
1225           "the buffered-amount-low signal is emitted",
1226           0, G_MAXUINT64, 0, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
1227 
1228   /**
1229    * GstWebRTCDataChannel::on-open:
1230    * @object: the #GstWebRTCDataChannel
1231    */
1232   gst_webrtc_data_channel_signals[SIGNAL_ON_OPEN] =
1233       g_signal_new ("on-open", G_TYPE_FROM_CLASS (klass),
1234       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
1235       G_TYPE_NONE, 0);
1236 
1237   /**
1238    * GstWebRTCDataChannel::on-close:
1239    * @object: the #GstWebRTCDataChannel
1240    */
1241   gst_webrtc_data_channel_signals[SIGNAL_ON_CLOSE] =
1242       g_signal_new ("on-close", G_TYPE_FROM_CLASS (klass),
1243       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
1244       G_TYPE_NONE, 0);
1245 
1246   /**
1247    * GstWebRTCDataChannel::on-error:
1248    * @object: the #GstWebRTCDataChannel
1249    * @error: the #GError thrown
1250    */
1251   gst_webrtc_data_channel_signals[SIGNAL_ON_ERROR] =
1252       g_signal_new ("on-error", G_TYPE_FROM_CLASS (klass),
1253       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
1254       G_TYPE_NONE, 1, G_TYPE_ERROR);
1255 
1256   /**
1257    * GstWebRTCDataChannel::on-message-data:
1258    * @object: the #GstWebRTCDataChannel
1259    * @data: (nullable): a #GBytes of the data received
1260    */
1261   gst_webrtc_data_channel_signals[SIGNAL_ON_MESSAGE_DATA] =
1262       g_signal_new ("on-message-data", G_TYPE_FROM_CLASS (klass),
1263       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
1264       G_TYPE_NONE, 1, G_TYPE_BYTES);
1265 
1266   /**
1267    * GstWebRTCDataChannel::on-message-string:
1268    * @object: the #GstWebRTCDataChannel
1269    * @data: (nullable): the data received as a string
1270    */
1271   gst_webrtc_data_channel_signals[SIGNAL_ON_MESSAGE_STRING] =
1272       g_signal_new ("on-message-string", G_TYPE_FROM_CLASS (klass),
1273       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
1274       G_TYPE_NONE, 1, G_TYPE_STRING);
1275 
1276   /**
1277    * GstWebRTCDataChannel::on-buffered-amount-low:
1278    * @object: the #GstWebRTCDataChannel
1279    */
1280   gst_webrtc_data_channel_signals[SIGNAL_ON_BUFFERED_AMOUNT_LOW] =
1281       g_signal_new ("on-buffered-amount-low", G_TYPE_FROM_CLASS (klass),
1282       G_SIGNAL_RUN_LAST, 0, NULL, NULL, g_cclosure_marshal_generic,
1283       G_TYPE_NONE, 0);
1284 
1285   /**
1286    * GstWebRTCDataChannel::send-data:
1287    * @object: the #GstWebRTCDataChannel
1288    * @data: (nullable): a #GBytes with the data
1289    */
1290   gst_webrtc_data_channel_signals[SIGNAL_SEND_DATA] =
1291       g_signal_new_class_handler ("send-data", G_TYPE_FROM_CLASS (klass),
1292       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
1293       G_CALLBACK (gst_webrtc_data_channel_send_data), NULL, NULL,
1294       g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_BYTES);
1295 
1296   /**
1297    * GstWebRTCDataChannel::send-string:
1298    * @object: the #GstWebRTCDataChannel
1299    * @data: (nullable): a #GBytes with the data
1300    */
1301   gst_webrtc_data_channel_signals[SIGNAL_SEND_STRING] =
1302       g_signal_new_class_handler ("send-string", G_TYPE_FROM_CLASS (klass),
1303       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
1304       G_CALLBACK (gst_webrtc_data_channel_send_string), NULL, NULL,
1305       g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_STRING);
1306 
1307   /**
1308    * GstWebRTCDataChannel::close:
1309    * @object: the #GstWebRTCDataChannel
1310    *
1311    * Close the data channel
1312    */
1313   gst_webrtc_data_channel_signals[SIGNAL_CLOSE] =
1314       g_signal_new_class_handler ("close", G_TYPE_FROM_CLASS (klass),
1315       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
1316       G_CALLBACK (gst_webrtc_data_channel_close), NULL, NULL,
1317       g_cclosure_marshal_generic, G_TYPE_NONE, 0);
1318 }
1319 
1320 static void
gst_webrtc_data_channel_init(GstWebRTCDataChannel * channel)1321 gst_webrtc_data_channel_init (GstWebRTCDataChannel * channel)
1322 {
1323 }
1324