1 /*
2  * Copyright (c) 2015, Collabora Ltd.
3  *
4  * Redistribution and use in source and binary forms, with or without modification,
5  * are permitted provided that the following conditions are met:
6  *
7  * 1. Redistributions of source code must retain the above copyright notice, this
8  * list of conditions and the following disclaimer.
9  *
10  * 2. Redistributions in binary form must reproduce the above copyright notice, this
11  * list of conditions and the following disclaimer in the documentation and/or other
12  * materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
16  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
17  * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
18  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
19  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
20  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
21  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
23  * OF SUCH DAMAGE.
24  */
25 
26 #ifdef HAVE_CONFIG_H
27 #include "config.h"
28 #endif
29 #include "gstsctpenc.h"
30 
31 #include <gst/sctp/sctpsendmeta.h>
32 #include <stdio.h>
33 
34 GST_DEBUG_CATEGORY_STATIC (gst_sctp_enc_debug_category);
35 #define GST_CAT_DEFAULT gst_sctp_enc_debug_category
36 
37 #define gst_sctp_enc_parent_class parent_class
38 G_DEFINE_TYPE (GstSctpEnc, gst_sctp_enc, GST_TYPE_ELEMENT);
39 
40 static GstStaticPadTemplate sink_template =
41 GST_STATIC_PAD_TEMPLATE ("sink_%u", GST_PAD_SINK,
42     GST_PAD_REQUEST, GST_STATIC_CAPS_ANY);
43 
44 static GstStaticPadTemplate src_template =
45 GST_STATIC_PAD_TEMPLATE ("src", GST_PAD_SRC,
46     GST_PAD_ALWAYS, GST_STATIC_CAPS ("application/x-sctp"));
47 
48 enum
49 {
50   SIGNAL_SCTP_ASSOCIATION_ESTABLISHED,
51   SIGNAL_GET_STREAM_BYTES_SENT,
52   NUM_SIGNALS
53 };
54 
55 static guint signals[NUM_SIGNALS];
56 
57 enum
58 {
59   PROP_0,
60 
61   PROP_GST_SCTP_ASSOCIATION_ID,
62   PROP_REMOTE_SCTP_PORT,
63   PROP_USE_SOCK_STREAM,
64 
65   NUM_PROPERTIES
66 };
67 
68 static GParamSpec *properties[NUM_PROPERTIES];
69 
70 #define DEFAULT_GST_SCTP_ASSOCIATION_ID 1
71 #define DEFAULT_REMOTE_SCTP_PORT 0
72 #define DEFAULT_GST_SCTP_ORDERED TRUE
73 #define DEFAULT_SCTP_PPID 1
74 #define DEFAULT_USE_SOCK_STREAM FALSE
75 
76 #define BUFFER_FULL_SLEEP_TIME 100000
77 
78 GType gst_sctp_enc_pad_get_type (void);
79 
80 #define GST_TYPE_SCTP_ENC_PAD (gst_sctp_enc_pad_get_type())
81 #define GST_SCTP_ENC_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST((obj), GST_TYPE_SCTP_ENC_PAD, GstSctpEncPad))
82 #define GST_SCTP_ENC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST((klass), GST_TYPE_SCTP_ENC_PAD, GstSctpEncPadClass))
83 #define GST_IS_SCTP_ENC_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE((obj), GST_TYPE_SCTP_ENC_PAD))
84 #define GST_IS_SCTP_ENC_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE((klass), GST_TYPE_SCTP_ENC_PAD))
85 
86 typedef struct _GstSctpEncPad GstSctpEncPad;
87 typedef GstPadClass GstSctpEncPadClass;
88 
89 struct _GstSctpEncPad
90 {
91   GstPad parent;
92 
93   guint16 stream_id;
94   gboolean ordered;
95   guint32 ppid;
96   GstSctpAssociationPartialReliability reliability;
97   guint32 reliability_param;
98 
99   guint64 bytes_sent;
100 
101   GMutex lock;
102   GCond cond;
103   gboolean flushing;
104 };
105 
106 G_DEFINE_TYPE (GstSctpEncPad, gst_sctp_enc_pad, GST_TYPE_PAD);
107 
108 static void
gst_sctp_enc_pad_finalize(GObject * object)109 gst_sctp_enc_pad_finalize (GObject * object)
110 {
111   GstSctpEncPad *self = GST_SCTP_ENC_PAD (object);
112 
113   g_cond_clear (&self->cond);
114   g_mutex_clear (&self->lock);
115 
116   G_OBJECT_CLASS (gst_sctp_enc_pad_parent_class)->finalize (object);
117 }
118 
119 static void
gst_sctp_enc_pad_class_init(GstSctpEncPadClass * klass)120 gst_sctp_enc_pad_class_init (GstSctpEncPadClass * klass)
121 {
122   GObjectClass *gobject_class = (GObjectClass *) klass;
123 
124   gobject_class->finalize = gst_sctp_enc_pad_finalize;
125 }
126 
127 static void
gst_sctp_enc_pad_init(GstSctpEncPad * self)128 gst_sctp_enc_pad_init (GstSctpEncPad * self)
129 {
130   g_mutex_init (&self->lock);
131   g_cond_init (&self->cond);
132   self->flushing = FALSE;
133 }
134 
135 static void gst_sctp_enc_finalize (GObject * object);
136 static void gst_sctp_enc_set_property (GObject * object, guint prop_id,
137     const GValue * value, GParamSpec * pspec);
138 static void gst_sctp_enc_get_property (GObject * object, guint prop_id,
139     GValue * value, GParamSpec * pspec);
140 static GstStateChangeReturn gst_sctp_enc_change_state (GstElement * element,
141     GstStateChange transition);
142 static GstPad *gst_sctp_enc_request_new_pad (GstElement * element,
143     GstPadTemplate * template, const gchar * name, const GstCaps * caps);
144 static void gst_sctp_enc_release_pad (GstElement * element, GstPad * pad);
145 static void gst_sctp_enc_srcpad_loop (GstPad * pad);
146 static GstFlowReturn gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent,
147     GstBuffer * buffer);
148 static gboolean gst_sctp_enc_sink_event (GstPad * pad, GstObject * parent,
149     GstEvent * event);
150 static gboolean gst_sctp_enc_src_event (GstPad * pad, GstObject * parent,
151     GstEvent * event);
152 static void on_sctp_association_state_changed (GstSctpAssociation *
153     sctp_association, GParamSpec * pspec, GstSctpEnc * self);
154 
155 static gboolean configure_association (GstSctpEnc * self);
156 static void on_sctp_packet_out (GstSctpAssociation * sctp_association,
157     const guint8 * buf, gsize length, gpointer user_data);
158 static void stop_srcpad_task (GstPad * pad, GstSctpEnc * self);
159 static void sctpenc_cleanup (GstSctpEnc * self);
160 static void get_config_from_caps (const GstCaps * caps, gboolean * ordered,
161     GstSctpAssociationPartialReliability * reliability,
162     guint32 * reliability_param, guint32 * ppid, gboolean * ppid_available);
163 static guint64 on_get_stream_bytes_sent (GstSctpEnc * self, guint stream_id);
164 
165 static void
gst_sctp_enc_class_init(GstSctpEncClass * klass)166 gst_sctp_enc_class_init (GstSctpEncClass * klass)
167 {
168   GObjectClass *gobject_class;
169   GstElementClass *element_class;
170 
171   gobject_class = (GObjectClass *) klass;
172   element_class = (GstElementClass *) klass;
173 
174   GST_DEBUG_CATEGORY_INIT (gst_sctp_enc_debug_category,
175       "sctpenc", 0, "debug category for sctpenc element");
176 
177   gst_element_class_add_pad_template (GST_ELEMENT_CLASS (klass),
178       gst_static_pad_template_get (&src_template));
179   gst_element_class_add_pad_template (GST_ELEMENT_CLASS (klass),
180       gst_static_pad_template_get (&sink_template));
181 
182   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_sctp_enc_finalize);
183   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_sctp_enc_set_property);
184   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_sctp_enc_get_property);
185 
186   element_class->change_state = GST_DEBUG_FUNCPTR (gst_sctp_enc_change_state);
187   element_class->request_new_pad =
188       GST_DEBUG_FUNCPTR (gst_sctp_enc_request_new_pad);
189   element_class->release_pad = GST_DEBUG_FUNCPTR (gst_sctp_enc_release_pad);
190 
191   properties[PROP_GST_SCTP_ASSOCIATION_ID] =
192       g_param_spec_uint ("sctp-association-id",
193       "SCTP Association ID",
194       "Every encoder/decoder pair should have the same, unique, sctp-association-id. "
195       "This value must be set before any pads are requested.",
196       0, G_MAXUINT, DEFAULT_GST_SCTP_ASSOCIATION_ID,
197       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
198 
199   properties[PROP_REMOTE_SCTP_PORT] =
200       g_param_spec_uint ("remote-sctp-port",
201       "Remote SCTP port",
202       "Sctp remote sctp port for the sctp association. The local port is configured via the "
203       "GstSctpDec element.",
204       0, G_MAXUSHORT, DEFAULT_REMOTE_SCTP_PORT,
205       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
206 
207   properties[PROP_USE_SOCK_STREAM] =
208       g_param_spec_boolean ("use-sock-stream",
209       "Use sock-stream",
210       "When set to TRUE, a sequenced, reliable, connection-based connection is used."
211       "When TRUE the partial reliability parameters of the channel are ignored.",
212       DEFAULT_USE_SOCK_STREAM, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
213 
214   g_object_class_install_properties (gobject_class, NUM_PROPERTIES, properties);
215 
216   signals[SIGNAL_SCTP_ASSOCIATION_ESTABLISHED] =
217       g_signal_new ("sctp-association-established",
218       G_TYPE_FROM_CLASS (gobject_class), G_SIGNAL_RUN_LAST,
219       G_STRUCT_OFFSET (GstSctpEncClass, on_sctp_association_is_established),
220       NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_BOOLEAN);
221 
222   signals[SIGNAL_GET_STREAM_BYTES_SENT] = g_signal_new ("bytes-sent",
223       G_TYPE_FROM_CLASS (gobject_class), G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
224       G_STRUCT_OFFSET (GstSctpEncClass, on_get_stream_bytes_sent), NULL, NULL,
225       g_cclosure_marshal_generic, G_TYPE_UINT64, 1, G_TYPE_UINT);
226 
227   klass->on_get_stream_bytes_sent =
228       GST_DEBUG_FUNCPTR (on_get_stream_bytes_sent);
229 
230   gst_element_class_set_static_metadata (element_class,
231       "SCTP Encoder",
232       "Encoder/Network/SCTP",
233       "Encodes packets with SCTP",
234       "George Kiagiadakis <george.kiagiadakis@collabora.com>");
235 }
236 
237 static gboolean
data_queue_check_full_cb(GstDataQueue * queue,guint visible,guint bytes,guint64 time,gpointer user_data)238 data_queue_check_full_cb (GstDataQueue * queue, guint visible, guint bytes,
239     guint64 time, gpointer user_data)
240 {
241   /* TODO: When are we considered full? */
242   return FALSE;
243 }
244 
245 static void
data_queue_empty_cb(GstDataQueue * queue,gpointer user_data)246 data_queue_empty_cb (GstDataQueue * queue, gpointer user_data)
247 {
248 }
249 
250 static void
data_queue_full_cb(GstDataQueue * queue,gpointer user_data)251 data_queue_full_cb (GstDataQueue * queue, gpointer user_data)
252 {
253 }
254 
255 static void
gst_sctp_enc_init(GstSctpEnc * self)256 gst_sctp_enc_init (GstSctpEnc * self)
257 {
258   self->sctp_association_id = DEFAULT_GST_SCTP_ASSOCIATION_ID;
259   self->remote_sctp_port = DEFAULT_REMOTE_SCTP_PORT;
260 
261   self->sctp_association = NULL;
262   self->outbound_sctp_packet_queue =
263       gst_data_queue_new (data_queue_check_full_cb, data_queue_full_cb,
264       data_queue_empty_cb, NULL);
265 
266   self->src_pad = gst_pad_new_from_static_template (&src_template, "src");
267   gst_pad_set_event_function (self->src_pad,
268       GST_DEBUG_FUNCPTR ((GstPadEventFunction) gst_sctp_enc_src_event));
269   gst_element_add_pad (GST_ELEMENT (self), self->src_pad);
270 
271   g_queue_init (&self->pending_pads);
272 }
273 
274 static void
gst_sctp_enc_finalize(GObject * object)275 gst_sctp_enc_finalize (GObject * object)
276 {
277   GstSctpEnc *self = GST_SCTP_ENC (object);
278 
279   g_queue_clear (&self->pending_pads);
280   gst_object_unref (self->outbound_sctp_packet_queue);
281 
282   G_OBJECT_CLASS (parent_class)->finalize (object);
283 }
284 
285 static void
gst_sctp_enc_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)286 gst_sctp_enc_set_property (GObject * object, guint prop_id,
287     const GValue * value, GParamSpec * pspec)
288 {
289   GstSctpEnc *self = GST_SCTP_ENC (object);
290 
291   switch (prop_id) {
292     case PROP_GST_SCTP_ASSOCIATION_ID:
293       self->sctp_association_id = g_value_get_uint (value);
294       break;
295     case PROP_REMOTE_SCTP_PORT:
296       self->remote_sctp_port = g_value_get_uint (value);
297       break;
298     case PROP_USE_SOCK_STREAM:
299       self->use_sock_stream = g_value_get_boolean (value);
300       break;
301     default:
302       G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
303       break;
304   }
305 }
306 
307 static void
gst_sctp_enc_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)308 gst_sctp_enc_get_property (GObject * object, guint prop_id, GValue * value,
309     GParamSpec * pspec)
310 {
311   GstSctpEnc *self = GST_SCTP_ENC (object);
312 
313   switch (prop_id) {
314     case PROP_GST_SCTP_ASSOCIATION_ID:
315       g_value_set_uint (value, self->sctp_association_id);
316       break;
317     case PROP_REMOTE_SCTP_PORT:
318       g_value_set_uint (value, self->remote_sctp_port);
319       break;
320     case PROP_USE_SOCK_STREAM:
321       g_value_set_boolean (value, self->use_sock_stream);
322       break;
323     default:
324       G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
325       break;
326   }
327 }
328 
329 static GstStateChangeReturn
gst_sctp_enc_change_state(GstElement * element,GstStateChange transition)330 gst_sctp_enc_change_state (GstElement * element, GstStateChange transition)
331 {
332   GstSctpEnc *self = GST_SCTP_ENC (element);
333   GstStateChangeReturn ret = GST_STATE_CHANGE_FAILURE;
334   gboolean res = TRUE;
335 
336   switch (transition) {
337     case GST_STATE_CHANGE_NULL_TO_READY:
338       break;
339     case GST_STATE_CHANGE_READY_TO_PAUSED:
340       self->need_segment = self->need_stream_start_caps = TRUE;
341       gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, FALSE);
342       res = configure_association (self);
343       break;
344     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
345       break;
346     case GST_STATE_CHANGE_PAUSED_TO_READY:
347       sctpenc_cleanup (self);
348       break;
349     case GST_STATE_CHANGE_READY_TO_NULL:
350       break;
351     default:
352       break;
353   }
354 
355   if (res)
356     ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
357 
358   switch (transition) {
359     case GST_STATE_CHANGE_NULL_TO_READY:
360       break;
361     case GST_STATE_CHANGE_READY_TO_PAUSED:
362       gst_pad_start_task (self->src_pad,
363           (GstTaskFunction) gst_sctp_enc_srcpad_loop, self->src_pad, NULL);
364       break;
365     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
366       break;
367     case GST_STATE_CHANGE_PAUSED_TO_READY:
368       break;
369     case GST_STATE_CHANGE_READY_TO_NULL:
370       break;
371     default:
372       break;
373   }
374 
375   return ret;
376 }
377 
378 static GstPad *
gst_sctp_enc_request_new_pad(GstElement * element,GstPadTemplate * template,const gchar * new_pad_name,const GstCaps * caps)379 gst_sctp_enc_request_new_pad (GstElement * element, GstPadTemplate * template,
380     const gchar * new_pad_name, const GstCaps * caps)
381 {
382   GstSctpEnc *self = GST_SCTP_ENC (element);
383   GstPad *new_pad = NULL;
384   GstSctpEncPad *sctpenc_pad;
385   guint32 stream_id;
386   gint state;
387   guint32 new_ppid;
388   gboolean is_new_ppid;
389 
390   g_object_get (self->sctp_association, "state", &state, NULL);
391 
392   if (state != GST_SCTP_ASSOCIATION_STATE_CONNECTED) {
393     g_warning
394         ("The SCTP association must be established before a new stream can be created");
395     goto invalid_state;
396   }
397 
398   if (!template)
399     goto invalid_parameter;
400 
401   if (!new_pad_name || (sscanf (new_pad_name, "sink_%u", &stream_id) != 1)
402       || stream_id > 65534)     /* 65535 is not a valid stream id */
403     goto invalid_parameter;
404 
405   new_pad = gst_element_get_static_pad (element, new_pad_name);
406   if (new_pad) {
407     gst_object_unref (new_pad);
408     new_pad = NULL;
409     goto invalid_parameter;
410   }
411 
412   new_pad =
413       g_object_new (GST_TYPE_SCTP_ENC_PAD, "name", new_pad_name, "direction",
414       template->direction, "template", template, NULL);
415   gst_pad_set_chain_function (new_pad,
416       GST_DEBUG_FUNCPTR (gst_sctp_enc_sink_chain));
417   gst_pad_set_event_function (new_pad,
418       GST_DEBUG_FUNCPTR (gst_sctp_enc_sink_event));
419 
420   sctpenc_pad = GST_SCTP_ENC_PAD (new_pad);
421   sctpenc_pad->stream_id = stream_id;
422   sctpenc_pad->ppid = DEFAULT_SCTP_PPID;
423 
424   if (caps) {
425     get_config_from_caps (caps, &sctpenc_pad->ordered,
426         &sctpenc_pad->reliability, &sctpenc_pad->reliability_param, &new_ppid,
427         &is_new_ppid);
428 
429     if (is_new_ppid)
430       sctpenc_pad->ppid = new_ppid;
431   }
432 
433   sctpenc_pad->flushing = FALSE;
434 
435   if (!gst_pad_set_active (new_pad, TRUE))
436     goto error_cleanup;
437 
438   if (!gst_element_add_pad (element, new_pad))
439     goto error_cleanup;
440 
441 invalid_state:
442 invalid_parameter:
443   return new_pad;
444 error_cleanup:
445   gst_object_unref (new_pad);
446   return NULL;
447 }
448 
449 static void
gst_sctp_enc_release_pad(GstElement * element,GstPad * pad)450 gst_sctp_enc_release_pad (GstElement * element, GstPad * pad)
451 {
452   GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad);
453   GstSctpEnc *self;
454   guint stream_id = 0;
455 
456   self = GST_SCTP_ENC (element);
457 
458   g_mutex_lock (&sctpenc_pad->lock);
459   sctpenc_pad->flushing = TRUE;
460   g_cond_signal (&sctpenc_pad->cond);
461   g_mutex_unlock (&sctpenc_pad->lock);
462 
463   stream_id = sctpenc_pad->stream_id;
464   gst_pad_set_active (pad, FALSE);
465 
466   if (self->sctp_association)
467     gst_sctp_association_reset_stream (self->sctp_association, stream_id);
468 
469   gst_element_remove_pad (element, pad);
470 }
471 
472 static void
gst_sctp_enc_srcpad_loop(GstPad * pad)473 gst_sctp_enc_srcpad_loop (GstPad * pad)
474 {
475   GstSctpEnc *self = GST_SCTP_ENC (GST_PAD_PARENT (pad));
476   GstFlowReturn flow_ret;
477   GstDataQueueItem *item;
478 
479   if (self->need_stream_start_caps) {
480     gchar s_id[32];
481     GstCaps *caps;
482 
483     g_snprintf (s_id, sizeof (s_id), "sctpenc-%08x", g_random_int ());
484     gst_pad_push_event (self->src_pad, gst_event_new_stream_start (s_id));
485 
486     caps = gst_caps_new_empty_simple ("application/x-sctp");
487     gst_pad_set_caps (self->src_pad, caps);
488     gst_caps_unref (caps);
489 
490     self->need_stream_start_caps = FALSE;
491   }
492 
493   if (self->need_segment) {
494     GstSegment segment;
495 
496     gst_segment_init (&segment, GST_FORMAT_BYTES);
497     gst_pad_push_event (self->src_pad, gst_event_new_segment (&segment));
498 
499     self->need_segment = FALSE;
500   }
501 
502   if (gst_data_queue_pop (self->outbound_sctp_packet_queue, &item)) {
503     flow_ret = gst_pad_push (self->src_pad, GST_BUFFER (item->object));
504     item->object = NULL;
505 
506     if (G_UNLIKELY (flow_ret == GST_FLOW_FLUSHING
507             || flow_ret == GST_FLOW_NOT_LINKED)) {
508       GST_DEBUG_OBJECT (pad, "Push failed on packet source pad. Error: %s",
509           gst_flow_get_name (flow_ret));
510     } else if (G_UNLIKELY (flow_ret != GST_FLOW_OK)) {
511       GST_ERROR_OBJECT (pad, "Push failed on packet source pad. Error: %s",
512           gst_flow_get_name (flow_ret));
513     }
514 
515     if (G_UNLIKELY (flow_ret != GST_FLOW_OK)) {
516       GST_DEBUG_OBJECT (pad, "Pausing task because of an error");
517       gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, TRUE);
518       gst_data_queue_flush (self->outbound_sctp_packet_queue);
519       gst_pad_pause_task (pad);
520     }
521 
522     item->destroy (item);
523   } else {
524     GST_DEBUG_OBJECT (pad, "Pausing task because we're flushing");
525     gst_pad_pause_task (pad);
526   }
527 }
528 
529 static GstFlowReturn
gst_sctp_enc_sink_chain(GstPad * pad,GstObject * parent,GstBuffer * buffer)530 gst_sctp_enc_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
531 {
532   GstSctpEnc *self = GST_SCTP_ENC (parent);
533   GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad);
534   GstMapInfo map;
535   guint32 ppid;
536   gboolean ordered;
537   GstSctpAssociationPartialReliability pr;
538   guint32 pr_param;
539   gpointer state = NULL;
540   GstMeta *meta;
541   const GstMetaInfo *meta_info = GST_SCTP_SEND_META_INFO;
542   GstFlowReturn flow_ret = GST_FLOW_ERROR;
543 
544   ppid = sctpenc_pad->ppid;
545   ordered = sctpenc_pad->ordered;
546   pr = sctpenc_pad->reliability;
547   pr_param = sctpenc_pad->reliability_param;
548 
549   while ((meta = gst_buffer_iterate_meta (buffer, &state))) {
550     if (meta->info->api == meta_info->api) {
551       GstSctpSendMeta *sctp_send_meta = (GstSctpSendMeta *) meta;
552 
553       ppid = sctp_send_meta->ppid;
554       ordered = sctp_send_meta->ordered;
555       pr_param = sctp_send_meta->pr_param;
556       switch (sctp_send_meta->pr) {
557         case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_NONE:
558           pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE;
559           break;
560         case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_RTX:
561           pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_RTX;
562           break;
563         case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_BUF:
564           pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_BUF;
565           break;
566         case GST_SCTP_SEND_META_PARTIAL_RELIABILITY_TTL:
567           pr = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_TTL;
568           break;
569       }
570       break;
571     }
572   }
573 
574   if (!gst_buffer_map (buffer, &map, GST_MAP_READ)) {
575     g_warning ("Could not map GstBuffer");
576     goto error;
577   }
578 
579   g_mutex_lock (&sctpenc_pad->lock);
580   while (!sctpenc_pad->flushing) {
581     gboolean data_sent = FALSE;
582 
583     g_mutex_unlock (&sctpenc_pad->lock);
584 
585     data_sent =
586         gst_sctp_association_send_data (self->sctp_association, map.data,
587         map.size, sctpenc_pad->stream_id, ppid, ordered, pr, pr_param);
588 
589     g_mutex_lock (&sctpenc_pad->lock);
590     if (data_sent) {
591       sctpenc_pad->bytes_sent += map.size;
592       break;
593     } else if (!sctpenc_pad->flushing) {
594       gint64 end_time = g_get_monotonic_time () + BUFFER_FULL_SLEEP_TIME;
595 
596       /* The buffer was probably full. Retry in a while */
597       GST_OBJECT_LOCK (self);
598       g_queue_push_tail (&self->pending_pads, sctpenc_pad);
599       GST_OBJECT_UNLOCK (self);
600 
601       g_cond_wait_until (&sctpenc_pad->cond, &sctpenc_pad->lock, end_time);
602 
603       GST_OBJECT_LOCK (self);
604       g_queue_remove (&self->pending_pads, sctpenc_pad);
605       GST_OBJECT_UNLOCK (self);
606     }
607   }
608   flow_ret = sctpenc_pad->flushing ? GST_FLOW_FLUSHING : GST_FLOW_OK;
609   g_mutex_unlock (&sctpenc_pad->lock);
610 
611   gst_buffer_unmap (buffer, &map);
612 error:
613   gst_buffer_unref (buffer);
614   return flow_ret;
615 }
616 
617 static gboolean
gst_sctp_enc_sink_event(GstPad * pad,GstObject * parent,GstEvent * event)618 gst_sctp_enc_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
619 {
620   GstSctpEncPad *sctpenc_pad = GST_SCTP_ENC_PAD (pad);
621   gboolean ret, is_new_ppid;
622   guint32 new_ppid;
623 
624   switch (GST_EVENT_TYPE (event)) {
625     case GST_EVENT_CAPS:{
626       GstCaps *caps;
627 
628       gst_event_parse_caps (event, &caps);
629       get_config_from_caps (caps, &sctpenc_pad->ordered,
630           &sctpenc_pad->reliability, &sctpenc_pad->reliability_param, &new_ppid,
631           &is_new_ppid);
632       if (is_new_ppid)
633         sctpenc_pad->ppid = new_ppid;
634       gst_event_unref (event);
635       ret = TRUE;
636       break;
637     }
638     case GST_EVENT_STREAM_START:
639     case GST_EVENT_SEGMENT:
640       /* Drop these, we create our own */
641       ret = TRUE;
642       gst_event_unref (event);
643       break;
644     case GST_EVENT_EOS:
645       /* Drop this, we're never EOS until shut down */
646       ret = TRUE;
647       gst_event_unref (event);
648       break;
649     case GST_EVENT_FLUSH_START:
650       g_mutex_lock (&sctpenc_pad->lock);
651       sctpenc_pad->flushing = TRUE;
652       g_cond_signal (&sctpenc_pad->cond);
653       g_mutex_unlock (&sctpenc_pad->lock);
654 
655       ret = gst_pad_event_default (pad, parent, event);
656       break;
657     case GST_EVENT_FLUSH_STOP:
658       sctpenc_pad->flushing = FALSE;
659       ret = gst_pad_event_default (pad, parent, event);
660       break;
661     default:
662       ret = gst_pad_event_default (pad, parent, event);
663       break;
664   }
665   return ret;
666 }
667 
668 static void
flush_sinkpad(const GValue * item,gpointer user_data)669 flush_sinkpad (const GValue * item, gpointer user_data)
670 {
671   GstSctpEncPad *sctpenc_pad = g_value_get_object (item);
672   gboolean flush = GPOINTER_TO_INT (user_data);
673 
674   if (flush) {
675     g_mutex_lock (&sctpenc_pad->lock);
676     sctpenc_pad->flushing = TRUE;
677     g_cond_signal (&sctpenc_pad->cond);
678     g_mutex_unlock (&sctpenc_pad->lock);
679   } else {
680     sctpenc_pad->flushing = FALSE;
681   }
682 }
683 
684 static gboolean
gst_sctp_enc_src_event(GstPad * pad,GstObject * parent,GstEvent * event)685 gst_sctp_enc_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
686 {
687   GstSctpEnc *self = GST_SCTP_ENC (parent);
688   gboolean ret;
689 
690   switch (GST_EVENT_TYPE (event)) {
691     case GST_EVENT_FLUSH_START:{
692       GstIterator *it;
693 
694       gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, TRUE);
695       gst_data_queue_flush (self->outbound_sctp_packet_queue);
696 
697       it = gst_element_iterate_sink_pads (GST_ELEMENT (self));
698       while (gst_iterator_foreach (it, flush_sinkpad,
699               GINT_TO_POINTER (TRUE)) == GST_ITERATOR_RESYNC)
700         gst_iterator_resync (it);
701       gst_iterator_free (it);
702 
703       ret = gst_pad_event_default (pad, parent, event);
704       break;
705     }
706     case GST_EVENT_RECONFIGURE:
707     case GST_EVENT_FLUSH_STOP:{
708       GstIterator *it;
709 
710       it = gst_element_iterate_sink_pads (GST_ELEMENT (self));
711       while (gst_iterator_foreach (it, flush_sinkpad,
712               GINT_TO_POINTER (FALSE)) == GST_ITERATOR_RESYNC)
713         gst_iterator_resync (it);
714       gst_iterator_free (it);
715 
716       gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, FALSE);
717       self->need_segment = TRUE;
718       gst_pad_start_task (self->src_pad,
719           (GstTaskFunction) gst_sctp_enc_srcpad_loop, self->src_pad, NULL);
720 
721       ret = gst_pad_event_default (pad, parent, event);
722       break;
723     }
724     default:
725       ret = gst_pad_event_default (pad, parent, event);
726       break;
727   }
728   return ret;
729 }
730 
731 static gboolean
configure_association(GstSctpEnc * self)732 configure_association (GstSctpEnc * self)
733 {
734   gint state;
735 
736   self->sctp_association = gst_sctp_association_get (self->sctp_association_id);
737 
738   g_object_get (self->sctp_association, "state", &state, NULL);
739 
740   if (state != GST_SCTP_ASSOCIATION_STATE_NEW) {
741     GST_WARNING_OBJECT (self,
742         "Could not configure SCTP association. Association already in use!");
743     g_object_unref (self->sctp_association);
744     self->sctp_association = NULL;
745     goto error;
746   }
747 
748   self->signal_handler_state_changed =
749       g_signal_connect_object (self->sctp_association, "notify::state",
750       G_CALLBACK (on_sctp_association_state_changed), self, 0);
751 
752   g_object_bind_property (self, "remote-sctp-port", self->sctp_association,
753       "remote-port", G_BINDING_SYNC_CREATE);
754 
755   g_object_bind_property (self, "use-sock-stream", self->sctp_association,
756       "use-sock-stream", G_BINDING_SYNC_CREATE);
757 
758   gst_sctp_association_set_on_packet_out (self->sctp_association,
759       on_sctp_packet_out, self);
760 
761   return TRUE;
762 error:
763   return FALSE;
764 }
765 
766 static void
on_sctp_association_state_changed(GstSctpAssociation * sctp_association,GParamSpec * pspec,GstSctpEnc * self)767 on_sctp_association_state_changed (GstSctpAssociation * sctp_association,
768     GParamSpec * pspec, GstSctpEnc * self)
769 {
770   gint state;
771 
772   g_object_get (sctp_association, "state", &state, NULL);
773   switch (state) {
774     case GST_SCTP_ASSOCIATION_STATE_NEW:
775       break;
776     case GST_SCTP_ASSOCIATION_STATE_READY:
777       gst_sctp_association_start (sctp_association);
778       break;
779     case GST_SCTP_ASSOCIATION_STATE_CONNECTING:
780       break;
781     case GST_SCTP_ASSOCIATION_STATE_CONNECTED:
782       g_signal_emit_by_name (self, "sctp-association-established", TRUE);
783       break;
784     case GST_SCTP_ASSOCIATION_STATE_DISCONNECTING:
785       g_signal_emit (self, signals[SIGNAL_SCTP_ASSOCIATION_ESTABLISHED], 0,
786           FALSE);
787       break;
788     case GST_SCTP_ASSOCIATION_STATE_DISCONNECTED:
789       break;
790     case GST_SCTP_ASSOCIATION_STATE_ERROR:
791       break;
792   }
793 }
794 
795 static void
data_queue_item_free(GstDataQueueItem * item)796 data_queue_item_free (GstDataQueueItem * item)
797 {
798   if (item->object)
799     gst_mini_object_unref (item->object);
800   g_free (item);
801 }
802 
803 static void
on_sctp_packet_out(GstSctpAssociation * _association,const guint8 * buf,gsize length,gpointer user_data)804 on_sctp_packet_out (GstSctpAssociation * _association, const guint8 * buf,
805     gsize length, gpointer user_data)
806 {
807   GstSctpEnc *self = user_data;
808   GstBuffer *gstbuf;
809   GstDataQueueItem *item;
810   GList *pending_pads, *l;
811   GstSctpEncPad *sctpenc_pad;
812 
813   gstbuf = gst_buffer_new_wrapped (g_memdup (buf, length), length);
814 
815   item = g_new0 (GstDataQueueItem, 1);
816   item->object = GST_MINI_OBJECT (gstbuf);
817   item->size = length;
818   item->visible = TRUE;
819   item->destroy = (GDestroyNotify) data_queue_item_free;
820 
821   if (!gst_data_queue_push (self->outbound_sctp_packet_queue, item)) {
822     item->destroy (item);
823     GST_DEBUG_OBJECT (self, "Failed to push item because we're flushing");
824   }
825 
826   /* Wake up pads in the order they waited, oldest pad first */
827   GST_OBJECT_LOCK (self);
828   pending_pads = NULL;
829   while ((sctpenc_pad = g_queue_pop_tail (&self->pending_pads))) {
830     pending_pads = g_list_prepend (pending_pads, sctpenc_pad);
831   }
832   GST_OBJECT_UNLOCK (self);
833 
834   for (l = pending_pads; l; l = l->next) {
835     sctpenc_pad = l->data;
836     g_mutex_lock (&sctpenc_pad->lock);
837     g_cond_signal (&sctpenc_pad->cond);
838     g_mutex_unlock (&sctpenc_pad->lock);
839   }
840   g_list_free (pending_pads);
841 }
842 
843 static void
stop_srcpad_task(GstPad * pad,GstSctpEnc * self)844 stop_srcpad_task (GstPad * pad, GstSctpEnc * self)
845 {
846   gst_data_queue_set_flushing (self->outbound_sctp_packet_queue, TRUE);
847   gst_data_queue_flush (self->outbound_sctp_packet_queue);
848   gst_pad_stop_task (pad);
849 }
850 
851 static void
remove_sinkpad(const GValue * item,gpointer user_data)852 remove_sinkpad (const GValue * item, gpointer user_data)
853 {
854   GstSctpEncPad *sctpenc_pad = g_value_get_object (item);
855   GstSctpEnc *self = user_data;
856 
857   gst_sctp_enc_release_pad (GST_ELEMENT (self), GST_PAD (sctpenc_pad));
858 }
859 
860 static void
sctpenc_cleanup(GstSctpEnc * self)861 sctpenc_cleanup (GstSctpEnc * self)
862 {
863   GstIterator *it;
864 
865   /* FIXME: make this threadsafe */
866   /* gst_sctp_association_set_on_packet_out (self->sctp_association, NULL, NULL); */
867 
868   g_signal_handler_disconnect (self->sctp_association,
869       self->signal_handler_state_changed);
870   stop_srcpad_task (self->src_pad, self);
871   gst_sctp_association_force_close (self->sctp_association);
872   g_object_unref (self->sctp_association);
873   self->sctp_association = NULL;
874 
875   it = gst_element_iterate_sink_pads (GST_ELEMENT (self));
876   while (gst_iterator_foreach (it, remove_sinkpad, self) == GST_ITERATOR_RESYNC)
877     gst_iterator_resync (it);
878   gst_iterator_free (it);
879   g_queue_clear (&self->pending_pads);
880 }
881 
882 static void
get_config_from_caps(const GstCaps * caps,gboolean * ordered,GstSctpAssociationPartialReliability * reliability,guint32 * reliability_param,guint32 * ppid,gboolean * ppid_available)883 get_config_from_caps (const GstCaps * caps, gboolean * ordered,
884     GstSctpAssociationPartialReliability * reliability,
885     guint32 * reliability_param, guint32 * ppid, gboolean * ppid_available)
886 {
887   GstStructure *s;
888   guint i, n;
889 
890   *ordered = TRUE;
891   *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE;
892   *reliability_param = 0;
893   *ppid_available = FALSE;
894 
895   n = gst_caps_get_size (caps);
896   for (i = 0; i < n; i++) {
897     s = gst_caps_get_structure (caps, i);
898     if (gst_structure_has_field (s, "ordered")) {
899       const GValue *v = gst_structure_get_value (s, "ordered");
900       *ordered = g_value_get_boolean (v);
901     }
902     if (gst_structure_has_field (s, "partially-reliability")) {
903       const GValue *v = gst_structure_get_value (s, "partially-reliability");
904       const gchar *reliability_string = g_value_get_string (v);
905 
906       if (!g_strcmp0 (reliability_string, "none"))
907         *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE;
908       else if (!g_strcmp0 (reliability_string, "ttl"))
909         *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_TTL;
910       else if (!g_strcmp0 (reliability_string, "buf"))
911         *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_BUF;
912       else if (!g_strcmp0 (reliability_string, "rtx"))
913         *reliability = GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_RTX;
914     }
915     if (gst_structure_has_field (s, "reliability-parameter")) {
916       const GValue *v = gst_structure_get_value (s, "reliability-parameter");
917       *reliability_param = g_value_get_uint (v);
918     }
919     if (gst_structure_has_field (s, "ppid")) {
920       const GValue *v = gst_structure_get_value (s, "ppid");
921       *ppid = g_value_get_uint (v);
922       *ppid_available = TRUE;
923     }
924   }
925 }
926 
927 static guint64
on_get_stream_bytes_sent(GstSctpEnc * self,guint stream_id)928 on_get_stream_bytes_sent (GstSctpEnc * self, guint stream_id)
929 {
930   gchar *pad_name;
931   GstPad *pad;
932   GstSctpEncPad *sctpenc_pad;
933   guint64 bytes_sent;
934 
935   pad_name = g_strdup_printf ("sink_%u", stream_id);
936   pad = gst_element_get_static_pad (GST_ELEMENT (self), pad_name);
937   g_free (pad_name);
938 
939   if (!pad) {
940     GST_DEBUG_OBJECT (self,
941         "Buffered amount requested on a stream that does not exist!");
942     return 0;
943   }
944 
945   sctpenc_pad = GST_SCTP_ENC_PAD (pad);
946 
947   g_mutex_lock (&sctpenc_pad->lock);
948   bytes_sent = sctpenc_pad->bytes_sent;
949   g_mutex_unlock (&sctpenc_pad->lock);
950 
951   gst_object_unref (sctpenc_pad);
952 
953   return bytes_sent;
954 }
955