1 /*
2 * Copyright (c) 2015, Collabora Ltd.
3 *
4 * Redistribution and use in source and binary forms, with or without modification,
5 * are permitted provided that the following conditions are met:
6 *
7 * 1. Redistributions of source code must retain the above copyright notice, this
8 * list of conditions and the following disclaimer.
9 *
10 * 2. Redistributions in binary form must reproduce the above copyright notice, this
11 * list of conditions and the following disclaimer in the documentation and/or other
12 * materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
16 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
17 * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
18 * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
19 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
20 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
21 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
23 * OF SUCH DAMAGE.
24 */
25
26 #ifdef HAVE_CONFIG_H
27 #include "config.h"
28 #endif
29 #include "gstsctpenc.h"
30
31 #include <gst/sctp/sctpsendmeta.h>
32 #include <stdio.h>
33
34 GST_DEBUG_CATEGORY_STATIC (gst_sctp_enc_debug_category);
35 #define GST_CAT_DEFAULT gst_sctp_enc_debug_category
36
37 #define gst_sctp_enc_parent_class parent_class
38 G_DEFINE_TYPE (GstSctpEnc, gst_sctp_enc, GST_TYPE_ELEMENT);
39
40 static GstStaticPadTemplate sink_template =
41 GST_STATIC_PAD_TEMPLATE ("sink_%u", GST_PAD_SINK,
42 GST_PAD_REQUEST, GST_STATIC_CAPS_ANY);
43
44 static GstStaticPadTemplate src_template =
45 GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC,
46 GST_PAD_ALWAYS, GST_STATIC_CAPS ("application/x-sctp"));
47
48 enum
49 {
50 SIGNAL_SCTP_ASSOCIATION_ESTABLISHED,
51 SIGNAL_GET_STREAM_BYTES_SENT,
52 NUM_SIGNALS
53 };
54
55 static guint signals[NUM_SIGNALS];
56
57 enum
58 {
59 PROP_0,
60
61 PROP_GST_SCTP_ASSOCIATION_ID,
62 PROP_REMOTE_SCTP_PORT,
63 PROP_USE_SOCK_STREAM,
64
65 NUM_PROPERTIES
66 };
67
68 static GParamSpec *properties[NUM_PROPERTIES];
69
70 #define DEFAULT_GST_SCTP_ASSOCIATION_ID 1
71 #define DEFAULT_REMOTE_SCTP_PORT 0
72 #define DEFAULT_GST_SCTP_ORDERED TRUE
73 #define DEFAULT_SCTP_PPID 1
74 #define DEFAULT_USE_SOCK_STREAM FALSE
75
76 #define BUFFER_FULL_SLEEP_TIME 100000
77
78 GType gst_sctp_enc_pad_get_type (void);
79
80 #define GST_TYPE_SCTP_ENC_PAD (gst_sctp_enc_pad_get_type())
81 #define GST_SCTP_ENC_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), GST_TYPE_SCTP_ENC_PAD, GstSctpEncPad))
82 #define GST_SCTP_ENC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), GST_TYPE_SCTP_ENC_PAD, GstSctpEncPadClass))
83 #define GST_IS_SCTP_ENC_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), GST_TYPE_SCTP_ENC_PAD))
84 #define GST_IS_SCTP_ENC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), GST_TYPE_SCTP_ENC_PAD))
85
86 typedef struct _GstSctpEncPad GstSctpEncPad;
87 typedef GstPadClass GstSctpEncPadClass;
88
89 struct _GstSctpEncPad
90 {
91 GstPad parent;
92
93 guint16 stream_id;
94 gboolean ordered;
95 guint32 ppid;
96 GstSctpAssociationPartialReliability reliability;
97 guint32 reliability_param;
98
99 guint64 bytes_sent;
100
101 GMutex lock;
102 GCond cond;
103 gboolean flushing;
104 };
105
106 G_DEFINE_TYPE (GstSctpEncPad, gst_sctp_enc_pad, GST_TYPE_PAD);
107
108 static void
gst_sctp_enc_pad_finalize(GObject * object)109 gst_sctp_enc_pad_finalize (GObject * object)
110 {
111 GstSctpEncPad *self = GST_SCTP_ENC_PAD (object);
112
113 g_cond_clear (&self->cond);
114 g_mutex_clear (&self->lock);
115
116 G_OBJECT_CLASS (gst_sctp_enc_pad_parent_class)->finalize (object);
117 }
118
119 static void
gst_sctp_enc_pad_class_init(GstSctpEncPadClass * klass)120 gst_sctp_enc_pad_class_init (GstSctpEncPadClass * klass)
121 {
122 GObjectClass *gobject_class = (GObjectClass *) klass;
123
124 gobject_class->finalize = gst_sctp_enc_pad_finalize;
125 }
126
127 static void
gst_sctp_enc_pad_init(GstSctpEncPad * self)128 gst_sctp_enc_pad_init (GstSctpEncPad * self)
129 {
130 g_mutex_init (&self->lock);
131 g_cond_init (&self->cond);
132 self->flushing = FALSE;
133 }
134
135 static void gst_sctp_enc_finalize (GObject * object);
136 static void gst_sctp_enc_set_property (GObject * object, guint prop_id,
137 const GValue * value, GParamSpec * pspec);
138 static void gst_sctp_enc_get_property (GObject * object, guint prop_id,
139 GValue * value, GParamSpec * pspec);
140 static GstStateChangeReturn gst_sctp_enc_change_state (GstElement * element,
141 GstStateChange transition);
142 static GstPad *gst_sctp_enc_request_new_pad (GstElement * element,
143 GstPadTemplate * template, const gchar * name, const GstCaps * caps);
144 static void gst_sctp_enc_release_pad (GstElement * element, GstPad * pad);
145 static void gst_sctp_enc_srcpad_loop (GstPad * pad);
146 static GstFlowReturn gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent,
147 GstBuffer * buffer);
148 static gboolean gst_sctp_enc_sink_event (GstPad * pad, GstObject * parent,
149 GstEvent * event);
150 static gboolean gst_sctp_enc_src_event (GstPad * pad, GstObject * parent,
151 GstEvent * event);
152 static void on_sctp_association_state_changed (GstSctpAssociation *
153 sctp_association, GParamSpec * pspec, GstSctpEnc * self);
154
155 static gboolean configure_association (GstSctpEnc * self);
156 static void on_sctp_packet_out (GstSctpAssociation * sctp_association,
157 const guint8 * buf, gsize length, gpointer user_data);
158 static void stop_srcpad_task (GstPad * pad, GstSctpEnc * self);
159 static void sctpenc_cleanup (GstSctpEnc * self);
160 static void get_config_from_caps (const GstCaps * caps, gboolean * ordered,
161 GstSctpAssociationPartialReliability * reliability,
162 guint32 * reliability_param, guint32 * ppid, gboolean * ppid_available);
163 static guint64 on_get_stream_bytes_sent (GstSctpEnc * self, guint stream_id);
164
165 static void
gst_sctp_enc_class_init(GstSctpEncClass * klass)166 gst_sctp_enc_class_init (GstSctpEncClass * klass)
167 {
168 GObjectClass *gobject_class;
169 GstElementClass *element_class;
170
171 gobject_class = (GObjectClass *) klass;
172 element_class = (GstElementClass *) klass;
173
174 GST_DEBUG_CATEGORY_INIT (gst_sctp_enc_debug_category,
175 "sctpenc", 0, "debug category for sctpenc element");
176
177 gst_element_class_add_pad_template (GST_ELEMENT_CLASS (klass),
178 gst_static_pad_template_get (&src_template));
179 gst_element_class_add_pad_template (GST_ELEMENT_CLASS (klass),
180 gst_static_pad_template_get (&sink_template));
181
182 gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_sctp_enc_finalize);
183 gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_sctp_enc_set_property);
184 gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_sctp_enc_get_property);
185
186 element_class->change_state = GST_DEBUG_FUNCPTR (gst_sctp_enc_change_state);
187 element_class->request_new_pad =
188 GST_DEBUG_FUNCPTR (gst_sctp_enc_request_new_pad);
189 element_class->release_pad = GST_DEBUG_FUNCPTR (gst_sctp_enc_release_pad);
190
191 properties[PROP_GST_SCTP_ASSOCIATION_ID] =
192 g_param_spec_uint ("sctp-association-id",
193 "SCTP Association ID",
194 "Every encoder/decoder pair should have the same, unique, sctp-association-id. "
195 "This value must be set before any pads are requested.",
196 0, G_MAXUINT, DEFAULT_GST_SCTP_ASSOCIATION_ID,
197 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
198
199 properties[PROP_REMOTE_SCTP_PORT] =
200 g_param_spec_uint ("remote-sctp-port",
201 "Remote SCTP port",
202 "Sctp remote sctp port for the sctp association. The local port is configured via the "
203 "GstSctpDec element.",
204 0, G_MAXUSHORT, DEFAULT_REMOTE_SCTP_PORT,
205 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
206
207 properties[PROP_USE_SOCK_STREAM] =
208 g_param_spec_boolean ("use-sock-stream",
209 "Use sock-stream",
210 "When set to TRUE, a sequenced, reliable, connection-based connection is used."
211 "When TRUE the partial reliability parameters of the channel are ignored.",
212 DEFAULT_USE_SOCK_STREAM, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
213
214 g_object_class_install_properties (gobject_class, NUM_PROPERTIES, properties);
215
216 signals[SIGNAL_SCTP_ASSOCIATION_ESTABLISHED] =
217 g_signal_new ("sctp-association-established",
218 G_TYPE_FROM_CLASS (gobject_class), G_SIGNAL_RUN_LAST,
219 G_STRUCT_OFFSET (GstSctpEncClass, on_sctp_association_is_established),
220 NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_BOOLEAN);
221
222 signals[SIGNAL_GET_STREAM_BYTES_SENT] = g_signal_new ("bytes-sent",
223 G_TYPE_FROM_CLASS (gobject_class), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
224 G_STRUCT_OFFSET (GstSctpEncClass, on_get_stream_bytes_sent), NULL, NULL,
225 g_cclosure_marshal_generic, G_TYPE_UINT64, 1, G_TYPE_UINT);
226
227 klass->on_get_stream_bytes_sent =
228 GST_DEBUG_FUNCPTR (on_get_stream_bytes_sent);
229
230 gst_element_class_set_static_metadata (element_class,
231 "SCTP Encoder",
232 "Encoder/Network/SCTP",
233 "Encodes packets with SCTP",
234 "George Kiagiadakis <george.kiagiadakis@collabora.com>");
235 }
236
237 static gboolean
data_queue_check_full_cb(GstDataQueue * queue,guint visible,guint bytes,guint64 time,gpointer user_data)238 data_queue_check_full_cb (GstDataQueue * queue, guint visible, guint bytes,
239 guint64 time, gpointer user_data)
240 {
241 /* TODO: When are we considered full? */
242 return FALSE;
243 }
244
245 static void
data_queue_empty_cb(GstDataQueue * queue,gpointer user_data)246 data_queue_empty_cb (GstDataQueue * queue, gpointer user_data)
247 {
248 }
249
250 static void
data_queue_full_cb(GstDataQueue * queue,gpointer user_data)251 data_queue_full_cb (GstDataQueue * queue, gpointer user_data)
252 {
253 }
254
255 static void
gst_sctp_enc_init(GstSctpEnc * self)256 gst_sctp_enc_init (GstSctpEnc * self)
257 {
258 self->sctp_association_id = DEFAULT_GST_SCTP_ASSOCIATION_ID;
259 self->remote_sctp_port = DEFAULT_REMOTE_SCTP_PORT;
260
261 self->sctp_association = NULL;
262 self->outbound_sctp_packet_queue =
263 gst_data_queue_new (data_queue_check_full_cb, data_queue_full_cb,
264 data_queue_empty_cb, NULL);
265
266 self->src_pad = gst_pad_new_from_static_template (&src_template, "src");
267 gst_pad_set_event_function (self->src_pad,
268 GST_DEBUG_FUNCPTR ((GstPadEventFunction) gst_sctp_enc_src_event));
269 gst_element_add_pad (GST_ELEMENT (self), self->src_pad);
270
271 g_queue_init (&self->pending_pads);
272 }
273
274 static void
gst_sctp_enc_finalize(GObject * object)275 gst_sctp_enc_finalize (GObject * object)
276 {
277 GstSctpEnc *self = GST_SCTP_ENC (object);
278
279 g_queue_clear (&self->pending_pads);
280 gst_object_unref (self->outbound_sctp_packet_queue);
281
282 G_OBJECT_CLASS (parent_class)->finalize (object);
283 }
284
285 static void
gst_sctp_enc_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)286 gst_sctp_enc_set_property (GObject * object, guint prop_id,
287 const GValue * value, GParamSpec * pspec)
288 {
289 GstSctpEnc *self = GST_SCTP_ENC (object);
290
291 switch (prop_id) {
292 case PROP_GST_SCTP_ASSOCIATION_ID:
293 self->sctp_association_id = g_value_get_uint (value);
294 break;
295 case PROP_REMOTE_SCTP_PORT:
296 self->remote_sctp_port = g_value_get_uint (value);
297 break;
298 case PROP_USE_SOCK_STREAM:
299 self->use_sock_stream = g_value_get_boolean (value);
300 break;
301 default:
302 G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
303 break;
304 }
305 }
306
307 static void
gst_sctp_enc_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)308 gst_sctp_enc_get_property (GObject * object, guint prop_id, GValue * value,
309 GParamSpec * pspec)
310 {
311 GstSctpEnc *self = GST_SCTP_ENC (object);
312
313 switch (prop_id) {
314 case PROP_GST_SCTP_ASSOCIATION_ID:
315 g_value_set_uint (value, self->sctp_association_id);
316 break;
317 case PROP_REMOTE_SCTP_PORT:
318 g_value_set_uint (value, self->remote_sctp_port);
319 break;
320 case PROP_USE_SOCK_STREAM:
321 g_value_set_boolean (value, self->use_sock_stream);
322 break;
323 default:
324 G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
325 break;
326 }
327 }
328
329 static GstStateChangeReturn
gst_sctp_enc_change_state(GstElement * element,GstStateChange transition)330 gst_sctp_enc_change_state (GstElement * element, GstStateChange transition)
331 {
332 GstSctpEnc *self = GST_SCTP_ENC (element);
333 GstStateChangeReturn ret = GST_STATE_CHANGE_FAILURE;
334 gboolean res = TRUE;
335
336 switch (transition) {
337 case GST_STATE_CHANGE_NULL_TO_READY:
338 break;
339 case GST_STATE_CHANGE_READY_TO_PAUSED:
340 self->need_segment = self->need_stream_start_caps = TRUE;
341 gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, FALSE);
342 res = configure_association (self);
343 break;
344 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
345 break;
346 case GST_STATE_CHANGE_PAUSED_TO_READY:
347 sctpenc_cleanup (self);
348 break;
349 case GST_STATE_CHANGE_READY_TO_NULL:
350 break;
351 default:
352 break;
353 }
354
355 if (res)
356 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
357
358 switch (transition) {
359 case GST_STATE_CHANGE_NULL_TO_READY:
360 break;
361 case GST_STATE_CHANGE_READY_TO_PAUSED:
362 gst_pad_start_task (self->src_pad,
363 (GstTaskFunction) gst_sctp_enc_srcpad_loop, self->src_pad, NULL);
364 break;
365 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
366 break;
367 case GST_STATE_CHANGE_PAUSED_TO_READY:
368 break;
369 case GST_STATE_CHANGE_READY_TO_NULL:
370 break;
371 default:
372 break;
373 }
374
375 return ret;
376 }
377
378 static GstPad *
gst_sctp_enc_request_new_pad(GstElement * element,GstPadTemplate * template,const gchar * new_pad_name,const GstCaps * caps)379 gst_sctp_enc_request_new_pad (GstElement * element, GstPadTemplate * template,
380 const gchar * new_pad_name, const GstCaps * caps)
381 {
382 GstSctpEnc *self = GST_SCTP_ENC (element);
383 GstPad *new_pad = NULL;
384 GstSctpEncPad *sctpenc_pad;
385 guint32 stream_id;
386 gint state;
387 guint32 new_ppid;
388 gboolean is_new_ppid;
389
390 g_object_get (self->sctp_association, "state", &state, NULL);
391
392 if (state != GST_SCTP_ASSOCIATION_STATE_CONNECTED) {
393 g_warning
394 ("The SCTP association must be established before a new stream can be created");
395 goto invalid_state;
396 }
397
398 if (!template)
399 goto invalid_parameter;
400
401 if (!new_pad_name || (sscanf (new_pad_name, "sink_%u", &stream_id) != 1)
402 || stream_id > 65534) /* 65535 is not a valid stream id */
403 goto invalid_parameter;
404
405 new_pad = gst_element_get_static_pad (element, new_pad_name);
406 if (new_pad) {
407 gst_object_unref (new_pad);
408 new_pad = NULL;
409 goto invalid_parameter;
410 }
411
412 new_pad =
413 g_object_new (GST_TYPE_SCTP_ENC_PAD, "name", new_pad_name, "direction",
414 template->direction, "template", template, NULL);
415 gst_pad_set_chain_function (new_pad,
416 GST_DEBUG_FUNCPTR (gst_sctp_enc_sink_chain));
417 gst_pad_set_event_function (new_pad,
418 GST_DEBUG_FUNCPTR (gst_sctp_enc_sink_event));
419
420 sctpenc_pad = GST_SCTP_ENC_PAD (new_pad);
421 sctpenc_pad->stream_id = stream_id;
422 sctpenc_pad->ppid = DEFAULT_SCTP_PPID;
423
424 if (caps) {
425 get_config_from_caps (caps, &sctpenc_pad->ordered,
426 &sctpenc_pad->reliability, &sctpenc_pad->reliability_param, &new_ppid,
427 &is_new_ppid);
428
429 if (is_new_ppid)
430 sctpenc_pad->ppid = new_ppid;
431 }
432
433 sctpenc_pad->flushing = FALSE;
434
435 if (!gst_pad_set_active (new_pad, TRUE))
436 goto error_cleanup;
437
438 if (!gst_element_add_pad (element, new_pad))
439 goto error_cleanup;
440
441 invalid_state:
442 invalid_parameter:
443 return new_pad;
444 error_cleanup:
445 gst_object_unref (new_pad);
446 return NULL;
447 }
448
449 static void
gst_sctp_enc_release_pad(GstElement * element,GstPad * pad)450 gst_sctp_enc_release_pad (GstElement * element, GstPad * pad)
451 {
452 GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad);
453 GstSctpEnc *self;
454 guint stream_id = 0;
455
456 self = GST_SCTP_ENC (element);
457
458 g_mutex_lock (&sctpenc_pad->lock);
459 sctpenc_pad->flushing = TRUE;
460 g_cond_signal (&sctpenc_pad->cond);
461 g_mutex_unlock (&sctpenc_pad->lock);
462
463 stream_id = sctpenc_pad->stream_id;
464 gst_pad_set_active (pad, FALSE);
465
466 if (self->sctp_association)
467 gst_sctp_association_reset_stream (self->sctp_association, stream_id);
468
469 gst_element_remove_pad (element, pad);
470 }
471
472 static void
gst_sctp_enc_srcpad_loop(GstPad * pad)473 gst_sctp_enc_srcpad_loop (GstPad * pad)
474 {
475 GstSctpEnc *self = GST_SCTP_ENC (GST_PAD_PARENT (pad));
476 GstFlowReturn flow_ret;
477 GstDataQueueItem *item;
478
479 if (self->need_stream_start_caps) {
480 gchar s_id[32];
481 GstCaps *caps;
482
483 g_snprintf (s_id, sizeof (s_id), "sctpenc-%08x", g_random_int ());
484 gst_pad_push_event (self->src_pad, gst_event_new_stream_start (s_id));
485
486 caps = gst_caps_new_empty_simple ("application/x-sctp");
487 gst_pad_set_caps (self->src_pad, caps);
488 gst_caps_unref (caps);
489
490 self->need_stream_start_caps = FALSE;
491 }
492
493 if (self->need_segment) {
494 GstSegment segment;
495
496 gst_segment_init (&segment, GST_FORMAT_BYTES);
497 gst_pad_push_event (self->src_pad, gst_event_new_segment (&segment));
498
499 self->need_segment = FALSE;
500 }
501
502 if (gst_data_queue_pop (self->outbound_sctp_packet_queue, &item)) {
503 flow_ret = gst_pad_push (self->src_pad, GST_BUFFER (item->object));
504 item->object = NULL;
505
506 if (G_UNLIKELY (flow_ret == GST_FLOW_FLUSHING
507 || flow_ret == GST_FLOW_NOT_LINKED)) {
508 GST_DEBUG_OBJECT (pad, "Push failed on packet source pad. Error: %s",
509 gst_flow_get_name (flow_ret));
510 } else if (G_UNLIKELY (flow_ret != GST_FLOW_OK)) {
511 GST_ERROR_OBJECT (pad, "Push failed on packet source pad. Error: %s",
512 gst_flow_get_name (flow_ret));
513 }
514
515 if (G_UNLIKELY (flow_ret != GST_FLOW_OK)) {
516 GST_DEBUG_OBJECT (pad, "Pausing task because of an error");
517 gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, TRUE);
518 gst_data_queue_flush (self->outbound_sctp_packet_queue);
519 gst_pad_pause_task (pad);
520 }
521
522 item->destroy (item);
523 } else {
524 GST_DEBUG_OBJECT (pad, "Pausing task because we're flushing");
525 gst_pad_pause_task (pad);
526 }
527 }
528
529 static GstFlowReturn
gst_sctp_enc_sink_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)530 gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
531 {
532 GstSctpEnc *self = GST_SCTP_ENC (parent);
533 GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad);
534 GstMapInfo map;
535 guint32 ppid;
536 gboolean ordered;
537 GstSctpAssociationPartialReliability pr;
538 guint32 pr_param;
539 gpointer state = NULL;
540 GstMeta *meta;
541 const GstMetaInfo *meta_info = GST_SCTP_SEND_META_INFO;
542 GstFlowReturn flow_ret = GST_FLOW_ERROR;
543
544 ppid = sctpenc_pad->ppid;
545 ordered = sctpenc_pad->ordered;
546 pr = sctpenc_pad->reliability;
547 pr_param = sctpenc_pad->reliability_param;
548
549 while ((meta = gst_buffer_iterate_meta (buffer, &state))) {
550 if (meta->info->api == meta_info->api) {
551 GstSctpSendMeta *sctp_send_meta = (GstSctpSendMeta *) meta;
552
553 ppid = sctp_send_meta->ppid;
554 ordered = sctp_send_meta->ordered;
555 pr_param = sctp_send_meta->pr_param;
556 switch (sctp_send_meta->pr) {
557 case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE:
558 pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE;
559 break;
560 case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_RTX:
561 pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_RTX;
562 break;
563 case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_BUF:
564 pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_BUF;
565 break;
566 case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_TTL:
567 pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_TTL;
568 break;
569 }
570 break;
571 }
572 }
573
574 if (!gst_buffer_map (buffer, &map, GST_MAP_READ)) {
575 g_warning ("Could not map GstBuffer");
576 goto error;
577 }
578
579 g_mutex_lock (&sctpenc_pad->lock);
580 while (!sctpenc_pad->flushing) {
581 gboolean data_sent = FALSE;
582
583 g_mutex_unlock (&sctpenc_pad->lock);
584
585 data_sent =
586 gst_sctp_association_send_data (self->sctp_association, map.data,
587 map.size, sctpenc_pad->stream_id, ppid, ordered, pr, pr_param);
588
589 g_mutex_lock (&sctpenc_pad->lock);
590 if (data_sent) {
591 sctpenc_pad->bytes_sent += map.size;
592 break;
593 } else if (!sctpenc_pad->flushing) {
594 gint64 end_time = g_get_monotonic_time () + BUFFER_FULL_SLEEP_TIME;
595
596 /* The buffer was probably full. Retry in a while */
597 GST_OBJECT_LOCK (self);
598 g_queue_push_tail (&self->pending_pads, sctpenc_pad);
599 GST_OBJECT_UNLOCK (self);
600
601 g_cond_wait_until (&sctpenc_pad->cond, &sctpenc_pad->lock, end_time);
602
603 GST_OBJECT_LOCK (self);
604 g_queue_remove (&self->pending_pads, sctpenc_pad);
605 GST_OBJECT_UNLOCK (self);
606 }
607 }
608 flow_ret = sctpenc_pad->flushing ? GST_FLOW_FLUSHING : GST_FLOW_OK;
609 g_mutex_unlock (&sctpenc_pad->lock);
610
611 gst_buffer_unmap (buffer, &map);
612 error:
613 gst_buffer_unref (buffer);
614 return flow_ret;
615 }
616
617 static gboolean
gst_sctp_enc_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)618 gst_sctp_enc_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
619 {
620 GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad);
621 gboolean ret, is_new_ppid;
622 guint32 new_ppid;
623
624 switch (GST_EVENT_TYPE (event)) {
625 case GST_EVENT_CAPS:{
626 GstCaps *caps;
627
628 gst_event_parse_caps (event, &caps);
629 get_config_from_caps (caps, &sctpenc_pad->ordered,
630 &sctpenc_pad->reliability, &sctpenc_pad->reliability_param, &new_ppid,
631 &is_new_ppid);
632 if (is_new_ppid)
633 sctpenc_pad->ppid = new_ppid;
634 gst_event_unref (event);
635 ret = TRUE;
636 break;
637 }
638 case GST_EVENT_STREAM_START:
639 case GST_EVENT_SEGMENT:
640 /* Drop these, we create our own */
641 ret = TRUE;
642 gst_event_unref (event);
643 break;
644 case GST_EVENT_EOS:
645 /* Drop this, we're never EOS until shut down */
646 ret = TRUE;
647 gst_event_unref (event);
648 break;
649 case GST_EVENT_FLUSH_START:
650 g_mutex_lock (&sctpenc_pad->lock);
651 sctpenc_pad->flushing = TRUE;
652 g_cond_signal (&sctpenc_pad->cond);
653 g_mutex_unlock (&sctpenc_pad->lock);
654
655 ret = gst_pad_event_default (pad, parent, event);
656 break;
657 case GST_EVENT_FLUSH_STOP:
658 sctpenc_pad->flushing = FALSE;
659 ret = gst_pad_event_default (pad, parent, event);
660 break;
661 default:
662 ret = gst_pad_event_default (pad, parent, event);
663 break;
664 }
665 return ret;
666 }
667
668 static void
flush_sinkpad(const GValue * item,gpointer user_data)669 flush_sinkpad (const GValue * item, gpointer user_data)
670 {
671 GstSctpEncPad *sctpenc_pad = g_value_get_object (item);
672 gboolean flush = GPOINTER_TO_INT (user_data);
673
674 if (flush) {
675 g_mutex_lock (&sctpenc_pad->lock);
676 sctpenc_pad->flushing = TRUE;
677 g_cond_signal (&sctpenc_pad->cond);
678 g_mutex_unlock (&sctpenc_pad->lock);
679 } else {
680 sctpenc_pad->flushing = FALSE;
681 }
682 }
683
684 static gboolean
gst_sctp_enc_src_event(GstPad * pad,GstObject * parent,GstEvent * event)685 gst_sctp_enc_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
686 {
687 GstSctpEnc *self = GST_SCTP_ENC (parent);
688 gboolean ret;
689
690 switch (GST_EVENT_TYPE (event)) {
691 case GST_EVENT_FLUSH_START:{
692 GstIterator *it;
693
694 gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, TRUE);
695 gst_data_queue_flush (self->outbound_sctp_packet_queue);
696
697 it = gst_element_iterate_sink_pads (GST_ELEMENT (self));
698 while (gst_iterator_foreach (it, flush_sinkpad,
699 GINT_TO_POINTER (TRUE)) == GST_ITERATOR_RESYNC)
700 gst_iterator_resync (it);
701 gst_iterator_free (it);
702
703 ret = gst_pad_event_default (pad, parent, event);
704 break;
705 }
706 case GST_EVENT_RECONFIGURE:
707 case GST_EVENT_FLUSH_STOP:{
708 GstIterator *it;
709
710 it = gst_element_iterate_sink_pads (GST_ELEMENT (self));
711 while (gst_iterator_foreach (it, flush_sinkpad,
712 GINT_TO_POINTER (FALSE)) == GST_ITERATOR_RESYNC)
713 gst_iterator_resync (it);
714 gst_iterator_free (it);
715
716 gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, FALSE);
717 self->need_segment = TRUE;
718 gst_pad_start_task (self->src_pad,
719 (GstTaskFunction) gst_sctp_enc_srcpad_loop, self->src_pad, NULL);
720
721 ret = gst_pad_event_default (pad, parent, event);
722 break;
723 }
724 default:
725 ret = gst_pad_event_default (pad, parent, event);
726 break;
727 }
728 return ret;
729 }
730
731 static gboolean
configure_association(GstSctpEnc * self)732 configure_association (GstSctpEnc * self)
733 {
734 gint state;
735
736 self->sctp_association = gst_sctp_association_get (self->sctp_association_id);
737
738 g_object_get (self->sctp_association, "state", &state, NULL);
739
740 if (state != GST_SCTP_ASSOCIATION_STATE_NEW) {
741 GST_WARNING_OBJECT (self,
742 "Could not configure SCTP association. Association already in use!");
743 g_object_unref (self->sctp_association);
744 self->sctp_association = NULL;
745 goto error;
746 }
747
748 self->signal_handler_state_changed =
749 g_signal_connect_object (self->sctp_association, "notify::state",
750 G_CALLBACK (on_sctp_association_state_changed), self, 0);
751
752 g_object_bind_property (self, "remote-sctp-port", self->sctp_association,
753 "remote-port", G_BINDING_SYNC_CREATE);
754
755 g_object_bind_property (self, "use-sock-stream", self->sctp_association,
756 "use-sock-stream", G_BINDING_SYNC_CREATE);
757
758 gst_sctp_association_set_on_packet_out (self->sctp_association,
759 on_sctp_packet_out, self);
760
761 return TRUE;
762 error:
763 return FALSE;
764 }
765
766 static void
on_sctp_association_state_changed(GstSctpAssociation * sctp_association,GParamSpec * pspec,GstSctpEnc * self)767 on_sctp_association_state_changed (GstSctpAssociation * sctp_association,
768 GParamSpec * pspec, GstSctpEnc * self)
769 {
770 gint state;
771
772 g_object_get (sctp_association, "state", &state, NULL);
773 switch (state) {
774 case GST_SCTP_ASSOCIATION_STATE_NEW:
775 break;
776 case GST_SCTP_ASSOCIATION_STATE_READY:
777 gst_sctp_association_start (sctp_association);
778 break;
779 case GST_SCTP_ASSOCIATION_STATE_CONNECTING:
780 break;
781 case GST_SCTP_ASSOCIATION_STATE_CONNECTED:
782 g_signal_emit_by_name (self, "sctp-association-established", TRUE);
783 break;
784 case GST_SCTP_ASSOCIATION_STATE_DISCONNECTING:
785 g_signal_emit (self, signals[SIGNAL_SCTP_ASSOCIATION_ESTABLISHED], 0,
786 FALSE);
787 break;
788 case GST_SCTP_ASSOCIATION_STATE_DISCONNECTED:
789 break;
790 case GST_SCTP_ASSOCIATION_STATE_ERROR:
791 break;
792 }
793 }
794
795 static void
data_queue_item_free(GstDataQueueItem * item)796 data_queue_item_free (GstDataQueueItem * item)
797 {
798 if (item->object)
799 gst_mini_object_unref (item->object);
800 g_free (item);
801 }
802
803 static void
on_sctp_packet_out(GstSctpAssociation * _association,const guint8 * buf,gsize length,gpointer user_data)804 on_sctp_packet_out (GstSctpAssociation * _association, const guint8 * buf,
805 gsize length, gpointer user_data)
806 {
807 GstSctpEnc *self = user_data;
808 GstBuffer *gstbuf;
809 GstDataQueueItem *item;
810 GList *pending_pads, *l;
811 GstSctpEncPad *sctpenc_pad;
812
813 gstbuf = gst_buffer_new_wrapped (g_memdup (buf, length), length);
814
815 item = g_new0 (GstDataQueueItem, 1);
816 item->object = GST_MINI_OBJECT (gstbuf);
817 item->size = length;
818 item->visible = TRUE;
819 item->destroy = (GDestroyNotify) data_queue_item_free;
820
821 if (!gst_data_queue_push (self->outbound_sctp_packet_queue, item)) {
822 item->destroy (item);
823 GST_DEBUG_OBJECT (self, "Failed to push item because we're flushing");
824 }
825
826 /* Wake up pads in the order they waited, oldest pad first */
827 GST_OBJECT_LOCK (self);
828 pending_pads = NULL;
829 while ((sctpenc_pad = g_queue_pop_tail (&self->pending_pads))) {
830 pending_pads = g_list_prepend (pending_pads, sctpenc_pad);
831 }
832 GST_OBJECT_UNLOCK (self);
833
834 for (l = pending_pads; l; l = l->next) {
835 sctpenc_pad = l->data;
836 g_mutex_lock (&sctpenc_pad->lock);
837 g_cond_signal (&sctpenc_pad->cond);
838 g_mutex_unlock (&sctpenc_pad->lock);
839 }
840 g_list_free (pending_pads);
841 }
842
843 static void
stop_srcpad_task(GstPad * pad,GstSctpEnc * self)844 stop_srcpad_task (GstPad * pad, GstSctpEnc * self)
845 {
846 gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, TRUE);
847 gst_data_queue_flush (self->outbound_sctp_packet_queue);
848 gst_pad_stop_task (pad);
849 }
850
851 static void
remove_sinkpad(const GValue * item,gpointer user_data)852 remove_sinkpad (const GValue * item, gpointer user_data)
853 {
854 GstSctpEncPad *sctpenc_pad = g_value_get_object (item);
855 GstSctpEnc *self = user_data;
856
857 gst_sctp_enc_release_pad (GST_ELEMENT (self), GST_PAD (sctpenc_pad));
858 }
859
860 static void
sctpenc_cleanup(GstSctpEnc * self)861 sctpenc_cleanup (GstSctpEnc * self)
862 {
863 GstIterator *it;
864
865 /* FIXME: make this threadsafe */
866 /* gst_sctp_association_set_on_packet_out (self->sctp_association, NULL, NULL); */
867
868 g_signal_handler_disconnect (self->sctp_association,
869 self->signal_handler_state_changed);
870 stop_srcpad_task (self->src_pad, self);
871 gst_sctp_association_force_close (self->sctp_association);
872 g_object_unref (self->sctp_association);
873 self->sctp_association = NULL;
874
875 it = gst_element_iterate_sink_pads (GST_ELEMENT (self));
876 while (gst_iterator_foreach (it, remove_sinkpad, self) == GST_ITERATOR_RESYNC)
877 gst_iterator_resync (it);
878 gst_iterator_free (it);
879 g_queue_clear (&self->pending_pads);
880 }
881
882 static void
get_config_from_caps(const GstCaps * caps,gboolean * ordered,GstSctpAssociationPartialReliability * reliability,guint32 * reliability_param,guint32 * ppid,gboolean * ppid_available)883 get_config_from_caps (const GstCaps * caps, gboolean * ordered,
884 GstSctpAssociationPartialReliability * reliability,
885 guint32 * reliability_param, guint32 * ppid, gboolean * ppid_available)
886 {
887 GstStructure *s;
888 guint i, n;
889
890 *ordered = TRUE;
891 *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE;
892 *reliability_param = 0;
893 *ppid_available = FALSE;
894
895 n = gst_caps_get_size (caps);
896 for (i = 0; i < n; i++) {
897 s = gst_caps_get_structure (caps, i);
898 if (gst_structure_has_field (s, "ordered")) {
899 const GValue *v = gst_structure_get_value (s, "ordered");
900 *ordered = g_value_get_boolean (v);
901 }
902 if (gst_structure_has_field (s, "partially-reliability")) {
903 const GValue *v = gst_structure_get_value (s, "partially-reliability");
904 const gchar *reliability_string = g_value_get_string (v);
905
906 if (!g_strcmp0 (reliability_string, "none"))
907 *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE;
908 else if (!g_strcmp0 (reliability_string, "ttl"))
909 *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_TTL;
910 else if (!g_strcmp0 (reliability_string, "buf"))
911 *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_BUF;
912 else if (!g_strcmp0 (reliability_string, "rtx"))
913 *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_RTX;
914 }
915 if (gst_structure_has_field (s, "reliability-parameter")) {
916 const GValue *v = gst_structure_get_value (s, "reliability-parameter");
917 *reliability_param = g_value_get_uint (v);
918 }
919 if (gst_structure_has_field (s, "ppid")) {
920 const GValue *v = gst_structure_get_value (s, "ppid");
921 *ppid = g_value_get_uint (v);
922 *ppid_available = TRUE;
923 }
924 }
925 }
926
927 static guint64
on_get_stream_bytes_sent(GstSctpEnc * self,guint stream_id)928 on_get_stream_bytes_sent (GstSctpEnc * self, guint stream_id)
929 {
930 gchar *pad_name;
931 GstPad *pad;
932 GstSctpEncPad *sctpenc_pad;
933 guint64 bytes_sent;
934
935 pad_name = g_strdup_printf ("sink_%u", stream_id);
936 pad = gst_element_get_static_pad (GST_ELEMENT (self), pad_name);
937 g_free (pad_name);
938
939 if (!pad) {
940 GST_DEBUG_OBJECT (self,
941 "Buffered amount requested on a stream that does not exist!");
942 return 0;
943 }
944
945 sctpenc_pad = GST_SCTP_ENC_PAD (pad);
946
947 g_mutex_lock (&sctpenc_pad->lock);
948 bytes_sent = sctpenc_pad->bytes_sent;
949 g_mutex_unlock (&sctpenc_pad->lock);
950
951 gst_object_unref (sctpenc_pad);
952
953 return bytes_sent;
954 }
955