1 /*
2  * Copyright (c) 2015, Collabora Ltd.
3  *
4  * Redistribution and use in source and binary forms, with or without modification,
5  * are permitted provided that the following conditions are met:
6  *
7  * 1. Redistributions of source code must retain the above copyright notice, this
8  * list of conditions and the following disclaimer.
9  *
10  * 2. Redistributions in binary form must reproduce the above copyright notice, this
11  * list of conditions and the following disclaimer in the documentation and/or other
12  * materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
16  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
17  * IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
18  * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
19  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
20  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
21  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
22  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY
23  * OF SUCH DAMAGE.
24  */
25 
26 #ifdef HAVE_CONFIG_H
27 #include "config.h"
28 #endif
29 
30 #include "sctpassociation.h"
31 
32 #include <string.h>
33 #include <errno.h>
34 #include <stdlib.h>
35 
36 #define GST_SCTP_ASSOCIATION_STATE_TYPE (gst_sctp_association_state_get_type())
37 static GType
gst_sctp_association_state_get_type(void)38 gst_sctp_association_state_get_type (void)
39 {
40   static const GEnumValue values[] = {
41     {GST_SCTP_ASSOCIATION_STATE_NEW, "state-new", "state-new"},
42     {GST_SCTP_ASSOCIATION_STATE_READY, "state-ready", "state-ready"},
43     {GST_SCTP_ASSOCIATION_STATE_CONNECTING, "state-connecting",
44         "state-connecting"},
45     {GST_SCTP_ASSOCIATION_STATE_CONNECTED, "state-connected",
46         "state-connected"},
47     {GST_SCTP_ASSOCIATION_STATE_DISCONNECTING, "state-disconnecting",
48         "state-disconnecting"},
49     {GST_SCTP_ASSOCIATION_STATE_DISCONNECTED, "state-disconnected",
50         "state-disconnected"},
51     {GST_SCTP_ASSOCIATION_STATE_ERROR, "state-error", "state-error"},
52     {0, NULL, NULL}
53   };
54   static volatile GType id = 0;
55 
56   if (g_once_init_enter ((gsize *) & id)) {
57     GType _id;
58     _id = g_enum_register_static ("GstSctpAssociationState", values);
59     g_once_init_leave ((gsize *) & id, _id);
60   }
61 
62   return id;
63 }
64 
65 G_DEFINE_TYPE (GstSctpAssociation, gst_sctp_association, G_TYPE_OBJECT);
66 
67 enum
68 {
69   SIGNAL_STREAM_RESET,
70   LAST_SIGNAL
71 };
72 
73 
74 enum
75 {
76   PROP_0,
77 
78   PROP_ASSOCIATION_ID,
79   PROP_LOCAL_PORT,
80   PROP_REMOTE_PORT,
81   PROP_STATE,
82   PROP_USE_SOCK_STREAM,
83 
84   NUM_PROPERTIES
85 };
86 
87 static guint signals[LAST_SIGNAL] = { 0 };
88 
89 static GParamSpec *properties[NUM_PROPERTIES];
90 
91 #define DEFAULT_NUMBER_OF_SCTP_STREAMS 10
92 #define DEFAULT_LOCAL_SCTP_PORT 0
93 #define DEFAULT_REMOTE_SCTP_PORT 0
94 
95 static GHashTable *associations = NULL;
96 G_LOCK_DEFINE_STATIC (associations_lock);
97 static guint32 number_of_associations = 0;
98 
99 /* Interface implementations */
100 static void gst_sctp_association_finalize (GObject * object);
101 static void gst_sctp_association_set_property (GObject * object, guint prop_id,
102     const GValue * value, GParamSpec * pspec);
103 static void gst_sctp_association_get_property (GObject * object, guint prop_id,
104     GValue * value, GParamSpec * pspec);
105 
106 static struct socket *create_sctp_socket (GstSctpAssociation *
107     gst_sctp_association);
108 static struct sockaddr_conn get_sctp_socket_address (GstSctpAssociation *
109     gst_sctp_association, guint16 port);
110 static gpointer connection_thread_func (GstSctpAssociation * self);
111 static gboolean client_role_connect (GstSctpAssociation * self);
112 static int sctp_packet_out (void *addr, void *buffer, size_t length, guint8 tos,
113     guint8 set_df);
114 static int receive_cb (struct socket *sock, union sctp_sockstore addr,
115     void *data, size_t datalen, struct sctp_rcvinfo rcv_info, gint flags,
116     void *ulp_info);
117 static void handle_notification (GstSctpAssociation * self,
118     const union sctp_notification *notification, size_t length);
119 static void handle_association_changed (GstSctpAssociation * self,
120     const struct sctp_assoc_change *sac);
121 static void handle_stream_reset_event (GstSctpAssociation * self,
122     const struct sctp_stream_reset_event *ssr);
123 static void handle_message (GstSctpAssociation * self, guint8 * data,
124     guint32 datalen, guint16 stream_id, guint32 ppid);
125 
126 static void maybe_set_state_to_ready (GstSctpAssociation * self);
127 static void gst_sctp_association_change_state (GstSctpAssociation * self,
128     GstSctpAssociationState new_state, gboolean notify);
129 
130 static void
gst_sctp_association_class_init(GstSctpAssociationClass * klass)131 gst_sctp_association_class_init (GstSctpAssociationClass * klass)
132 {
133   GObjectClass *gobject_class;
134 
135   gobject_class = (GObjectClass *) klass;
136 
137   gobject_class->finalize = gst_sctp_association_finalize;
138   gobject_class->set_property = gst_sctp_association_set_property;
139   gobject_class->get_property = gst_sctp_association_get_property;
140 
141   signals[SIGNAL_STREAM_RESET] =
142       g_signal_new ("stream-reset", G_OBJECT_CLASS_TYPE (klass),
143       G_SIGNAL_RUN_FIRST, G_STRUCT_OFFSET (GstSctpAssociationClass,
144           on_sctp_stream_reset), NULL, NULL, g_cclosure_marshal_generic,
145       G_TYPE_NONE, 1, G_TYPE_UINT);
146 
147   properties[PROP_ASSOCIATION_ID] = g_param_spec_uint ("association-id",
148       "The SCTP association-id", "The SCTP association-id.", 0, G_MAXUSHORT,
149       DEFAULT_LOCAL_SCTP_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
150 
151   properties[PROP_LOCAL_PORT] = g_param_spec_uint ("local-port", "Local SCTP",
152       "The local SCTP port for this association", 0, G_MAXUSHORT,
153       DEFAULT_LOCAL_SCTP_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
154 
155   properties[PROP_REMOTE_PORT] =
156       g_param_spec_uint ("remote-port", "Remote SCTP",
157       "The remote SCTP port for this association", 0, G_MAXUSHORT,
158       DEFAULT_LOCAL_SCTP_PORT, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
159 
160   properties[PROP_STATE] = g_param_spec_enum ("state", "SCTP Association state",
161       "The state of the SCTP association", GST_SCTP_ASSOCIATION_STATE_TYPE,
162       GST_SCTP_ASSOCIATION_STATE_NEW,
163       G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
164 
165   properties[PROP_USE_SOCK_STREAM] =
166       g_param_spec_boolean ("use-sock-stream", "Use sock-stream",
167       "When set to TRUE, a sequenced, reliable, connection-based connection is used."
168       "When TRUE the partial reliability parameters of the channel is ignored.",
169       FALSE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS);
170 
171   g_object_class_install_properties (gobject_class, NUM_PROPERTIES, properties);
172 }
173 
174 static void
gst_sctp_association_init(GstSctpAssociation * self)175 gst_sctp_association_init (GstSctpAssociation * self)
176 {
177   /* No need to lock mutex here as long as the function is only called from gst_sctp_association_get */
178   if (number_of_associations == 0) {
179     usrsctp_init (0, sctp_packet_out, g_print);
180 
181     /* Explicit Congestion Notification */
182     usrsctp_sysctl_set_sctp_ecn_enable (0);
183 
184     usrsctp_sysctl_set_sctp_nr_outgoing_streams_default
185         (DEFAULT_NUMBER_OF_SCTP_STREAMS);
186   }
187   number_of_associations++;
188 
189   self->local_port = DEFAULT_LOCAL_SCTP_PORT;
190   self->remote_port = DEFAULT_REMOTE_SCTP_PORT;
191   self->sctp_ass_sock = NULL;
192 
193   self->connection_thread = NULL;
194   g_mutex_init (&self->association_mutex);
195 
196   self->state = GST_SCTP_ASSOCIATION_STATE_NEW;
197 
198   self->use_sock_stream = FALSE;
199 
200   usrsctp_register_address ((void *) self);
201 }
202 
203 static void
gst_sctp_association_finalize(GObject * object)204 gst_sctp_association_finalize (GObject * object)
205 {
206   GstSctpAssociation *self = GST_SCTP_ASSOCIATION (object);
207 
208   G_LOCK (associations_lock);
209 
210   g_hash_table_remove (associations, GUINT_TO_POINTER (self->association_id));
211 
212   usrsctp_deregister_address ((void *) self);
213   number_of_associations--;
214   if (number_of_associations == 0) {
215     usrsctp_finish ();
216   }
217   G_UNLOCK (associations_lock);
218 
219   if (self->connection_thread)
220     g_thread_join (self->connection_thread);
221 
222   G_OBJECT_CLASS (gst_sctp_association_parent_class)->finalize (object);
223 }
224 
225 static void
gst_sctp_association_set_property(GObject * object,guint prop_id,const GValue * value,GParamSpec * pspec)226 gst_sctp_association_set_property (GObject * object, guint prop_id,
227     const GValue * value, GParamSpec * pspec)
228 {
229   GstSctpAssociation *self = GST_SCTP_ASSOCIATION (object);
230 
231   g_mutex_lock (&self->association_mutex);
232   if (self->state != GST_SCTP_ASSOCIATION_STATE_NEW) {
233     switch (prop_id) {
234       case PROP_LOCAL_PORT:
235       case PROP_REMOTE_PORT:
236         g_warning ("These properties cannot be set in this state");
237         goto error;
238     }
239   }
240 
241   switch (prop_id) {
242     case PROP_ASSOCIATION_ID:
243       self->association_id = g_value_get_uint (value);
244       break;
245     case PROP_LOCAL_PORT:
246       self->local_port = g_value_get_uint (value);
247       break;
248     case PROP_REMOTE_PORT:
249       self->remote_port = g_value_get_uint (value);
250       break;
251     case PROP_STATE:
252       self->state = g_value_get_enum (value);
253       break;
254     case PROP_USE_SOCK_STREAM:
255       self->use_sock_stream = g_value_get_boolean (value);
256       break;
257     default:
258       G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
259       break;
260   }
261 
262   g_mutex_unlock (&self->association_mutex);
263   if (prop_id == PROP_LOCAL_PORT || prop_id == PROP_REMOTE_PORT)
264     maybe_set_state_to_ready (self);
265 
266   return;
267 
268 error:
269   g_mutex_unlock (&self->association_mutex);
270 }
271 
272 static void
maybe_set_state_to_ready(GstSctpAssociation * self)273 maybe_set_state_to_ready (GstSctpAssociation * self)
274 {
275   gboolean signal_ready_state = FALSE;
276 
277   g_mutex_lock (&self->association_mutex);
278   if ((self->state == GST_SCTP_ASSOCIATION_STATE_NEW) &&
279       (self->local_port != 0 && self->remote_port != 0)
280       && (self->packet_out_cb != NULL) && (self->packet_received_cb != NULL)) {
281     signal_ready_state = TRUE;
282     gst_sctp_association_change_state (self, GST_SCTP_ASSOCIATION_STATE_READY,
283         FALSE);
284   }
285   g_mutex_unlock (&self->association_mutex);
286 
287   /* The reason the state is changed twice is that we do not want to change state with
288    * notification while the association_mutex is locked. If someone listens
289    * on property change and call this object a deadlock might occur.*/
290   if (signal_ready_state)
291     gst_sctp_association_change_state (self, GST_SCTP_ASSOCIATION_STATE_READY,
292         TRUE);
293 
294 }
295 
296 static void
gst_sctp_association_get_property(GObject * object,guint prop_id,GValue * value,GParamSpec * pspec)297 gst_sctp_association_get_property (GObject * object, guint prop_id,
298     GValue * value, GParamSpec * pspec)
299 {
300   GstSctpAssociation *self = GST_SCTP_ASSOCIATION (object);
301 
302   switch (prop_id) {
303     case PROP_ASSOCIATION_ID:
304       g_value_set_uint (value, self->association_id);
305       break;
306     case PROP_LOCAL_PORT:
307       g_value_set_uint (value, self->local_port);
308       break;
309     case PROP_REMOTE_PORT:
310       g_value_set_uint (value, self->remote_port);
311       break;
312     case PROP_STATE:
313       g_value_set_enum (value, self->state);
314       break;
315     case PROP_USE_SOCK_STREAM:
316       g_value_set_boolean (value, self->use_sock_stream);
317       break;
318     default:
319       G_OBJECT_WARN_INVALID_PROPERTY_ID (self, prop_id, pspec);
320       break;
321   }
322 }
323 
324 /* Public functions */
325 
326 GstSctpAssociation *
gst_sctp_association_get(guint32 association_id)327 gst_sctp_association_get (guint32 association_id)
328 {
329   GstSctpAssociation *association;
330 
331   G_LOCK (associations_lock);
332   if (!associations) {
333     associations =
334         g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, NULL);
335   }
336 
337   association =
338       g_hash_table_lookup (associations, GUINT_TO_POINTER (association_id));
339   if (!association) {
340     association =
341         g_object_new (GST_SCTP_TYPE_ASSOCIATION, "association-id",
342         association_id, NULL);
343     g_hash_table_insert (associations, GUINT_TO_POINTER (association_id),
344         association);
345   } else {
346     g_object_ref (association);
347   }
348   G_UNLOCK (associations_lock);
349   return association;
350 }
351 
352 gboolean
gst_sctp_association_start(GstSctpAssociation * self)353 gst_sctp_association_start (GstSctpAssociation * self)
354 {
355   gchar *thread_name;
356 
357   g_mutex_lock (&self->association_mutex);
358   if (self->state != GST_SCTP_ASSOCIATION_STATE_READY) {
359     g_warning ("SCTP association is in wrong state and cannot be started");
360     goto configure_required;
361   }
362 
363   if ((self->sctp_ass_sock = create_sctp_socket (self)) == NULL)
364     goto error;
365 
366   gst_sctp_association_change_state (self,
367       GST_SCTP_ASSOCIATION_STATE_CONNECTING, FALSE);
368   g_mutex_unlock (&self->association_mutex);
369 
370   /* The reason the state is changed twice is that we do not want to change state with
371    * notification while the association_mutex is locked. If someone listens
372    * on property change and call this object a deadlock might occur.*/
373   gst_sctp_association_change_state (self,
374       GST_SCTP_ASSOCIATION_STATE_CONNECTING, TRUE);
375 
376   thread_name = g_strdup_printf ("connection_thread_%u", self->association_id);
377   self->connection_thread = g_thread_new (thread_name,
378       (GThreadFunc) connection_thread_func, self);
379   g_free (thread_name);
380 
381   return TRUE;
382 error:
383   g_mutex_unlock (&self->association_mutex);
384   gst_sctp_association_change_state (self, GST_SCTP_ASSOCIATION_STATE_ERROR,
385       TRUE);
386 configure_required:
387   g_mutex_unlock (&self->association_mutex);
388   return FALSE;
389 }
390 
391 void
gst_sctp_association_set_on_packet_out(GstSctpAssociation * self,GstSctpAssociationPacketOutCb packet_out_cb,gpointer user_data)392 gst_sctp_association_set_on_packet_out (GstSctpAssociation * self,
393     GstSctpAssociationPacketOutCb packet_out_cb, gpointer user_data)
394 {
395   g_return_if_fail (GST_SCTP_IS_ASSOCIATION (self));
396 
397   g_mutex_lock (&self->association_mutex);
398   if (self->state == GST_SCTP_ASSOCIATION_STATE_NEW) {
399     self->packet_out_cb = packet_out_cb;
400     self->packet_out_user_data = user_data;
401   } else {
402     /* This is to be thread safe. The Association might try to write to the closure already */
403     g_warning ("It is not possible to change packet callback in this state");
404   }
405   g_mutex_unlock (&self->association_mutex);
406 
407   maybe_set_state_to_ready (self);
408 }
409 
410 void
gst_sctp_association_set_on_packet_received(GstSctpAssociation * self,GstSctpAssociationPacketReceivedCb packet_received_cb,gpointer user_data)411 gst_sctp_association_set_on_packet_received (GstSctpAssociation * self,
412     GstSctpAssociationPacketReceivedCb packet_received_cb, gpointer user_data)
413 {
414   g_return_if_fail (GST_SCTP_IS_ASSOCIATION (self));
415 
416   g_mutex_lock (&self->association_mutex);
417   if (self->state == GST_SCTP_ASSOCIATION_STATE_NEW) {
418     self->packet_received_cb = packet_received_cb;
419     self->packet_received_user_data = user_data;
420   } else {
421     /* This is to be thread safe. The Association might try to write to the closure already */
422     g_warning ("It is not possible to change receive callback in this state");
423   }
424   g_mutex_unlock (&self->association_mutex);
425 
426   maybe_set_state_to_ready (self);
427 }
428 
429 void
gst_sctp_association_incoming_packet(GstSctpAssociation * self,guint8 * buf,guint32 length)430 gst_sctp_association_incoming_packet (GstSctpAssociation * self, guint8 * buf,
431     guint32 length)
432 {
433   usrsctp_conninput ((void *) self, (const void *) buf, (size_t) length, 0);
434 }
435 
436 gboolean
gst_sctp_association_send_data(GstSctpAssociation * self,guint8 * buf,guint32 length,guint16 stream_id,guint32 ppid,gboolean ordered,GstSctpAssociationPartialReliability pr,guint32 reliability_param)437 gst_sctp_association_send_data (GstSctpAssociation * self, guint8 * buf,
438     guint32 length, guint16 stream_id, guint32 ppid, gboolean ordered,
439     GstSctpAssociationPartialReliability pr, guint32 reliability_param)
440 {
441   struct sctp_sendv_spa spa;
442   gint32 bytes_sent;
443   gboolean result = FALSE;
444   struct sockaddr_conn remote_addr;
445 
446   g_mutex_lock (&self->association_mutex);
447   if (self->state != GST_SCTP_ASSOCIATION_STATE_CONNECTED)
448     goto end;
449 
450   memset (&spa, 0, sizeof (spa));
451 
452   spa.sendv_sndinfo.snd_ppid = g_htonl (ppid);
453   spa.sendv_sndinfo.snd_sid = stream_id;
454   spa.sendv_sndinfo.snd_flags = ordered ? 0 : SCTP_UNORDERED;
455   spa.sendv_sndinfo.snd_context = 0;
456   spa.sendv_sndinfo.snd_assoc_id = 0;
457   spa.sendv_flags = SCTP_SEND_SNDINFO_VALID;
458   if (pr != GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_NONE) {
459     spa.sendv_flags |= SCTP_SEND_PRINFO_VALID;
460     spa.sendv_prinfo.pr_value = g_htonl (reliability_param);
461     if (pr == GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_TTL)
462       spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_TTL;
463     else if (pr == GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_RTX)
464       spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_RTX;
465     else if (pr == GST_SCTP_ASSOCIATION_PARTIAL_RELIABILITY_BUF)
466       spa.sendv_prinfo.pr_policy = SCTP_PR_SCTP_BUF;
467   }
468 
469   remote_addr = get_sctp_socket_address (self, self->remote_port);
470   bytes_sent =
471       usrsctp_sendv (self->sctp_ass_sock, buf, length,
472       (struct sockaddr *) &remote_addr, 1, (void *) &spa,
473       (socklen_t) sizeof (struct sctp_sendv_spa), SCTP_SENDV_SPA, 0);
474   if (bytes_sent < 0) {
475     if (errno == EAGAIN || errno == EWOULDBLOCK) {
476       /* Resending this buffer is taken care of by the gstsctpenc */
477       goto end;
478     } else {
479       g_warning ("Error sending data on stream %u: (%u) %s", stream_id, errno,
480           strerror (errno));
481       goto end;
482     }
483   }
484 
485   result = TRUE;
486 end:
487   g_mutex_unlock (&self->association_mutex);
488   return result;
489 }
490 
491 
492 void
gst_sctp_association_reset_stream(GstSctpAssociation * self,guint16 stream_id)493 gst_sctp_association_reset_stream (GstSctpAssociation * self, guint16 stream_id)
494 {
495   struct sctp_reset_streams *srs;
496   socklen_t length;
497 
498   length = (socklen_t) (sizeof (struct sctp_reset_streams) + sizeof (guint16));
499   srs = (struct sctp_reset_streams *) g_malloc0 (length);
500   srs->srs_flags = SCTP_STREAM_RESET_OUTGOING;
501   srs->srs_number_streams = 1;
502   srs->srs_stream_list[0] = stream_id;
503 
504   g_mutex_lock (&self->association_mutex);
505   usrsctp_setsockopt (self->sctp_ass_sock, IPPROTO_SCTP, SCTP_RESET_STREAMS,
506       srs, length);
507   g_mutex_unlock (&self->association_mutex);
508 
509   g_free (srs);
510 }
511 
512 void
gst_sctp_association_force_close(GstSctpAssociation * self)513 gst_sctp_association_force_close (GstSctpAssociation * self)
514 {
515   g_mutex_lock (&self->association_mutex);
516   if (self->sctp_ass_sock) {
517     usrsctp_close (self->sctp_ass_sock);
518     self->sctp_ass_sock = NULL;
519 
520   }
521   g_mutex_unlock (&self->association_mutex);
522 }
523 
524 static struct socket *
create_sctp_socket(GstSctpAssociation * self)525 create_sctp_socket (GstSctpAssociation * self)
526 {
527   struct socket *sock;
528   struct linger l;
529   struct sctp_event event;
530   struct sctp_assoc_value stream_reset;
531   int value = 1;
532   guint16 event_types[] = {
533     SCTP_ASSOC_CHANGE,
534     SCTP_PEER_ADDR_CHANGE,
535     SCTP_REMOTE_ERROR,
536     SCTP_SEND_FAILED,
537     SCTP_SHUTDOWN_EVENT,
538     SCTP_ADAPTATION_INDICATION,
539     /*SCTP_PARTIAL_DELIVERY_EVENT, */
540     /*SCTP_AUTHENTICATION_EVENT, */
541     SCTP_STREAM_RESET_EVENT,
542     /*SCTP_SENDER_DRY_EVENT, */
543     /*SCTP_NOTIFICATIONS_STOPPED_EVENT, */
544     /*SCTP_ASSOC_RESET_EVENT, */
545     SCTP_STREAM_CHANGE_EVENT
546   };
547   guint32 i;
548   guint sock_type = self->use_sock_stream ? SOCK_STREAM : SOCK_SEQPACKET;
549 
550   if ((sock =
551           usrsctp_socket (AF_CONN, sock_type, IPPROTO_SCTP, receive_cb, NULL, 0,
552               (void *) self)) == NULL)
553     goto error;
554 
555   if (usrsctp_set_non_blocking (sock, 1) < 0) {
556     g_warning ("Could not set non-blocking mode on SCTP socket");
557     goto error;
558   }
559 
560   memset (&l, 0, sizeof (l));
561   l.l_onoff = 1;
562   l.l_linger = 0;
563   if (usrsctp_setsockopt (sock, SOL_SOCKET, SO_LINGER, (const void *) &l,
564           (socklen_t) sizeof (struct linger)) < 0) {
565     g_warning ("Could not set SO_LINGER on SCTP socket");
566     goto error;
567   }
568 
569   if (usrsctp_setsockopt (sock, IPPROTO_SCTP, SCTP_NODELAY, &value,
570           sizeof (int))) {
571     g_warning ("Could not set SCTP_NODELAY");
572     goto error;
573   }
574 
575   memset (&stream_reset, 0, sizeof (stream_reset));
576   stream_reset.assoc_id = SCTP_ALL_ASSOC;
577   stream_reset.assoc_value = 1;
578   if (usrsctp_setsockopt (sock, IPPROTO_SCTP, SCTP_ENABLE_STREAM_RESET,
579           &stream_reset, sizeof (stream_reset))) {
580     g_warning ("Could not set SCTP_ENABLE_STREAM_RESET");
581     goto error;
582   }
583 
584   memset (&event, 0, sizeof (event));
585   event.se_assoc_id = SCTP_ALL_ASSOC;
586   event.se_on = 1;
587   for (i = 0; i < sizeof (event_types) / sizeof (event_types[0]); i++) {
588     event.se_type = event_types[i];
589     if (usrsctp_setsockopt (sock, IPPROTO_SCTP, SCTP_EVENT,
590             &event, sizeof (event)) < 0) {
591       g_warning ("Failed to register event %u", event_types[i]);
592     }
593   }
594 
595   return sock;
596 error:
597   if (sock) {
598     usrsctp_close (sock);
599     g_warning ("Could not create socket. Error: (%u) %s", errno,
600         strerror (errno));
601     errno = 0;
602     sock = NULL;
603   }
604   return NULL;
605 }
606 
607 static struct sockaddr_conn
get_sctp_socket_address(GstSctpAssociation * gst_sctp_association,guint16 port)608 get_sctp_socket_address (GstSctpAssociation * gst_sctp_association,
609     guint16 port)
610 {
611   struct sockaddr_conn addr;
612 
613   memset ((void *) &addr, 0, sizeof (struct sockaddr_conn));
614 #ifdef __APPLE__
615   addr.sconn_len = sizeof (struct sockaddr_conn);
616 #endif
617   addr.sconn_family = AF_CONN;
618   addr.sconn_port = g_htons (port);
619   addr.sconn_addr = (void *) gst_sctp_association;
620 
621   return addr;
622 }
623 
624 static gpointer
connection_thread_func(GstSctpAssociation * self)625 connection_thread_func (GstSctpAssociation * self)
626 {
627   /* TODO: Support both server and client role */
628   client_role_connect (self);
629   return NULL;
630 }
631 
632 static gboolean
client_role_connect(GstSctpAssociation * self)633 client_role_connect (GstSctpAssociation * self)
634 {
635   struct sockaddr_conn addr;
636   gint ret;
637 
638   g_mutex_lock (&self->association_mutex);
639   addr = get_sctp_socket_address (self, self->local_port);
640   ret =
641       usrsctp_bind (self->sctp_ass_sock, (struct sockaddr *) &addr,
642       sizeof (struct sockaddr_conn));
643   if (ret < 0) {
644     g_warning ("usrsctp_bind() error: (%u) %s", errno, strerror (errno));
645     goto error;
646   }
647 
648   addr = get_sctp_socket_address (self, self->remote_port);
649   ret =
650       usrsctp_connect (self->sctp_ass_sock, (struct sockaddr *) &addr,
651       sizeof (struct sockaddr_conn));
652   if (ret < 0 && errno != EINPROGRESS) {
653     g_warning ("usrsctp_connect() error: (%u) %s", errno, strerror (errno));
654     goto error;
655   }
656   g_mutex_unlock (&self->association_mutex);
657   return TRUE;
658 error:
659   g_mutex_unlock (&self->association_mutex);
660   return FALSE;
661 }
662 
663 static int
sctp_packet_out(void * addr,void * buffer,size_t length,guint8 tos,guint8 set_df)664 sctp_packet_out (void *addr, void *buffer, size_t length, guint8 tos,
665     guint8 set_df)
666 {
667   GstSctpAssociation *self = GST_SCTP_ASSOCIATION (addr);
668 
669   if (self->packet_out_cb) {
670     self->packet_out_cb (self, buffer, length, self->packet_out_user_data);
671   }
672 
673   return 0;
674 }
675 
676 static int
receive_cb(struct socket * sock,union sctp_sockstore addr,void * data,size_t datalen,struct sctp_rcvinfo rcv_info,gint flags,void * ulp_info)677 receive_cb (struct socket *sock, union sctp_sockstore addr, void *data,
678     size_t datalen, struct sctp_rcvinfo rcv_info, gint flags, void *ulp_info)
679 {
680   GstSctpAssociation *self = GST_SCTP_ASSOCIATION (ulp_info);
681 
682   if (!data) {
683     /* Not sure if this can happend. */
684     g_warning ("Received empty data buffer");
685   } else {
686     if (flags & MSG_NOTIFICATION) {
687       handle_notification (self, (const union sctp_notification *) data,
688           datalen);
689       /* We use this instead of a bare `free()` so that we use the `free` from
690        * the C runtime that usrsctp was built with. This makes a difference on
691        * Windows where libusrstcp and GStreamer can be linked to two different
692        * CRTs. */
693       usrsctp_freedumpbuffer (data);
694     } else {
695       handle_message (self, data, datalen, rcv_info.rcv_sid,
696           ntohl (rcv_info.rcv_ppid));
697     }
698   }
699 
700   return 1;
701 }
702 
703 static void
handle_notification(GstSctpAssociation * self,const union sctp_notification * notification,size_t length)704 handle_notification (GstSctpAssociation * self,
705     const union sctp_notification *notification, size_t length)
706 {
707   g_assert (notification->sn_header.sn_length == length);
708 
709   switch (notification->sn_header.sn_type) {
710     case SCTP_ASSOC_CHANGE:
711       g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "Event: SCTP_ASSOC_CHANGE");
712       handle_association_changed (self, &notification->sn_assoc_change);
713       break;
714     case SCTP_PEER_ADDR_CHANGE:
715       g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "Event: SCTP_PEER_ADDR_CHANGE");
716       break;
717     case SCTP_REMOTE_ERROR:
718       g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "Event: SCTP_REMOTE_ERROR");
719       break;
720     case SCTP_SEND_FAILED:
721       g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "Event: SCTP_SEND_FAILED");
722       break;
723     case SCTP_SHUTDOWN_EVENT:
724       g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "Event: SCTP_SHUTDOWN_EVENT");
725       break;
726     case SCTP_ADAPTATION_INDICATION:
727       g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO,
728           "Event: SCTP_ADAPTATION_INDICATION");
729       break;
730     case SCTP_PARTIAL_DELIVERY_EVENT:
731       g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO,
732           "Event: SCTP_PARTIAL_DELIVERY_EVENT");
733       break;
734     case SCTP_AUTHENTICATION_EVENT:
735       g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO,
736           "Event: SCTP_AUTHENTICATION_EVENT");
737       break;
738     case SCTP_STREAM_RESET_EVENT:
739       g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "Event: SCTP_STREAM_RESET_EVENT");
740       handle_stream_reset_event (self, &notification->sn_strreset_event);
741       break;
742     case SCTP_SENDER_DRY_EVENT:
743       g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "Event: SCTP_SENDER_DRY_EVENT");
744       break;
745     case SCTP_NOTIFICATIONS_STOPPED_EVENT:
746       g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO,
747           "Event: SCTP_NOTIFICATIONS_STOPPED_EVENT");
748       break;
749     case SCTP_ASSOC_RESET_EVENT:
750       g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "Event: SCTP_ASSOC_RESET_EVENT");
751       break;
752     case SCTP_STREAM_CHANGE_EVENT:
753       g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "Event: SCTP_STREAM_CHANGE_EVENT");
754       break;
755     case SCTP_SEND_FAILED_EVENT:
756       g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "Event: SCTP_SEND_FAILED_EVENT");
757       break;
758     default:
759       break;
760   }
761 }
762 
763 static void
handle_association_changed(GstSctpAssociation * self,const struct sctp_assoc_change * sac)764 handle_association_changed (GstSctpAssociation * self,
765     const struct sctp_assoc_change *sac)
766 {
767   gboolean change_state = FALSE;
768   GstSctpAssociationState new_state;
769 
770   switch (sac->sac_state) {
771     case SCTP_COMM_UP:
772       g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "SCTP_COMM_UP()");
773       g_mutex_lock (&self->association_mutex);
774       if (self->state == GST_SCTP_ASSOCIATION_STATE_CONNECTING) {
775         change_state = TRUE;
776         new_state = GST_SCTP_ASSOCIATION_STATE_CONNECTED;
777         g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO, "SCTP association connected!");
778       } else if (self->state == GST_SCTP_ASSOCIATION_STATE_CONNECTED) {
779         g_warning ("SCTP association already open");
780       } else {
781         g_warning ("SCTP association in unexpected state");
782       }
783       g_mutex_unlock (&self->association_mutex);
784       break;
785     case SCTP_COMM_LOST:
786       g_warning ("SCTP event SCTP_COMM_LOST received");
787       /* TODO: Tear down association and signal that this has happend */
788       break;
789     case SCTP_RESTART:
790       g_log (G_LOG_DOMAIN, G_LOG_LEVEL_INFO,
791           "SCTP event SCTP_RESTART received");
792       break;
793     case SCTP_SHUTDOWN_COMP:
794       g_warning ("SCTP event SCTP_SHUTDOWN_COMP received");
795       /* TODO: Tear down association and signal that this has happend */
796       break;
797     case SCTP_CANT_STR_ASSOC:
798       g_warning ("SCTP event SCTP_CANT_STR_ASSOC received");
799       break;
800   }
801 
802   if (change_state)
803     gst_sctp_association_change_state (self, new_state, TRUE);
804 }
805 
806 static void
handle_stream_reset_event(GstSctpAssociation * self,const struct sctp_stream_reset_event * sr)807 handle_stream_reset_event (GstSctpAssociation * self,
808     const struct sctp_stream_reset_event *sr)
809 {
810   guint32 i, n;
811   if (!(sr->strreset_flags & SCTP_STREAM_RESET_DENIED) &&
812       !(sr->strreset_flags & SCTP_STREAM_RESET_DENIED)) {
813     n = (sr->strreset_length -
814         sizeof (struct sctp_stream_reset_event)) / sizeof (uint16_t);
815     for (i = 0; i < n; i++) {
816       if (sr->strreset_flags & SCTP_STREAM_RESET_INCOMING_SSN) {
817         g_signal_emit (self, signals[SIGNAL_STREAM_RESET], 0,
818             sr->strreset_stream_list[i]);
819       }
820     }
821   }
822 }
823 
824 static void
handle_message(GstSctpAssociation * self,guint8 * data,guint32 datalen,guint16 stream_id,guint32 ppid)825 handle_message (GstSctpAssociation * self, guint8 * data, guint32 datalen,
826     guint16 stream_id, guint32 ppid)
827 {
828   if (self->packet_received_cb) {
829     self->packet_received_cb (self, data, datalen, stream_id, ppid,
830         self->packet_received_user_data);
831   }
832 }
833 
834 static void
gst_sctp_association_change_state(GstSctpAssociation * self,GstSctpAssociationState new_state,gboolean notify)835 gst_sctp_association_change_state (GstSctpAssociation * self,
836     GstSctpAssociationState new_state, gboolean notify)
837 {
838   self->state = new_state;
839   if (notify)
840     g_object_notify_by_pspec (G_OBJECT (self), properties[PROP_STATE]);
841 }
842