1 /* GStreamer
2  * Copyright (C) 2018, Collabora Ltd.
3  * Copyright (C) 2018, SK Telecom, Co., Ltd.
4  *   Author: Jeongseok Kim <jeongseok.kim@sk.com>
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19  * Boston, MA 02110-1301, USA.
20  */
21 
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25 
26 /* Needed for GValueArray */
27 #define GLIB_DISABLE_DEPRECATION_WARNINGS
28 
29 #include "gstsrtobject.h"
30 
31 #include <gst/base/gstbasesink.h>
32 #include <gio/gnetworking.h>
33 #include <stdlib.h>
34 
35 GST_DEBUG_CATEGORY_EXTERN (gst_debug_srtobject);
36 #define GST_CAT_DEFAULT gst_debug_srtobject
37 
38 enum
39 {
40   PROP_URI = 1,
41   PROP_MODE,
42   PROP_LOCALADDRESS,
43   PROP_LOCALPORT,
44   PROP_PASSPHRASE,
45   PROP_PBKEYLEN,
46   PROP_POLL_TIMEOUT,
47   PROP_LATENCY,
48   PROP_MSG_SIZE,
49   PROP_STATS,
50   PROP_LAST
51 };
52 
53 typedef struct
54 {
55   SRTSOCKET sock;
56   gint poll_id;
57   GSocketAddress *sockaddr;
58   gboolean sent_headers;
59 } SRTCaller;
60 
61 static SRTCaller *
srt_caller_new(void)62 srt_caller_new (void)
63 {
64   SRTCaller *caller = g_new0 (SRTCaller, 1);
65   caller->sock = SRT_INVALID_SOCK;
66   caller->poll_id = SRT_ERROR;
67   caller->sent_headers = FALSE;
68 
69   return caller;
70 }
71 
72 static void
srt_caller_free(SRTCaller * caller)73 srt_caller_free (SRTCaller * caller)
74 {
75   g_return_if_fail (caller != NULL);
76 
77   g_clear_object (&caller->sockaddr);
78 
79   if (caller->sock != SRT_INVALID_SOCK) {
80     srt_close (caller->sock);
81   }
82 
83   if (caller->poll_id != SRT_ERROR) {
84     srt_epoll_release (caller->poll_id);
85   }
86 
87   g_free (caller);
88 }
89 
90 static void
srt_caller_invoke_removed_closure(SRTCaller * caller,GstSRTObject * srtobject)91 srt_caller_invoke_removed_closure (SRTCaller * caller, GstSRTObject * srtobject)
92 {
93   GValue values[2] = { G_VALUE_INIT };
94 
95   if (srtobject->caller_removed_closure == NULL) {
96     return;
97   }
98 
99   g_value_init (&values[0], G_TYPE_INT);
100   g_value_set_int (&values[0], caller->sock);
101 
102   g_value_init (&values[1], G_TYPE_SOCKET_ADDRESS);
103   g_value_set_object (&values[1], caller->sockaddr);
104 
105   g_closure_invoke (srtobject->caller_removed_closure, NULL, 2, values, NULL);
106 
107   g_value_unset (&values[0]);
108   g_value_unset (&values[1]);
109 }
110 
111 struct srt_constant_params
112 {
113   const gchar *name;
114   gint param;
115   gint val;
116 };
117 
118 static struct srt_constant_params srt_params[] = {
119   {"SRTO_SNDSYN", SRTO_SNDSYN, 0},      /* 0: non-blocking */
120   {"SRTO_RCVSYN", SRTO_RCVSYN, 0},      /* 0: non-blocking */
121   {"SRTO_LINGER", SRTO_LINGER, 0},
122   {"SRTO_TSBPMODE", SRTO_TSBPDMODE, 1}, /* Timestamp-based Packet Delivery mode must be enabled */
123   {NULL, -1, -1},
124 };
125 
126 static gint srt_init_refcount = 0;
127 
128 static gboolean
gst_srt_object_set_common_params(SRTSOCKET sock,GstSRTObject * srtobject,GError ** error)129 gst_srt_object_set_common_params (SRTSOCKET sock, GstSRTObject * srtobject,
130     GError ** error)
131 {
132   struct srt_constant_params *params = srt_params;
133 
134   for (; params->name != NULL; params++) {
135     if (srt_setsockopt (sock, 0, params->param, &params->val, sizeof (gint))) {
136       g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
137           "failed to set %s (reason: %s)", params->name,
138           srt_getlasterror_str ());
139       return FALSE;
140     }
141   }
142 
143   if (srtobject->passphrase != NULL && srtobject->passphrase[0] != '\0') {
144     gint pbkeylen;
145 
146     if (srt_setsockopt (sock, 0, SRTO_PASSPHRASE, srtobject->passphrase,
147             strlen (srtobject->passphrase))) {
148       g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
149           "failed to set passphrase (reason: %s)", srt_getlasterror_str ());
150 
151       return FALSE;
152     }
153 
154     if (!gst_structure_get_int (srtobject->parameters, "pbkeylen", &pbkeylen)) {
155       pbkeylen = GST_SRT_DEFAULT_PBKEYLEN;
156     }
157 
158     if (srt_setsockopt (sock, 0, SRTO_PBKEYLEN, &pbkeylen, sizeof (int))) {
159       g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
160           "failed to set pbkeylen (reason: %s)", srt_getlasterror_str ());
161       return FALSE;
162     }
163   }
164 
165   {
166     int latency;
167 
168     if (!gst_structure_get_int (srtobject->parameters, "latency", &latency))
169       latency = GST_SRT_DEFAULT_LATENCY;
170     if (srt_setsockopt (sock, 0, SRTO_LATENCY, &latency, sizeof (int))) {
171       g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
172           "failed to set latency (reason: %s)", srt_getlasterror_str ());
173       return FALSE;
174     }
175   }
176 
177   return TRUE;
178 }
179 
180 GstSRTObject *
gst_srt_object_new(GstElement * element)181 gst_srt_object_new (GstElement * element)
182 {
183   GstSRTObject *srtobject;
184 
185   if (g_atomic_int_get (&srt_init_refcount) == 0) {
186     GST_DEBUG_OBJECT (element, "Starting up SRT");
187     if (srt_startup () != 0) {
188       g_warning ("Failed to initialize SRT (reason: %s)",
189           srt_getlasterror_str ());
190     }
191   }
192 
193   g_atomic_int_inc (&srt_init_refcount);
194 
195   srtobject = g_new0 (GstSRTObject, 1);
196   srtobject->element = element;
197   srtobject->parameters = gst_structure_new ("application/x-srt-params",
198       "poll-timeout", G_TYPE_INT, GST_SRT_DEFAULT_POLL_TIMEOUT,
199       "latency", G_TYPE_INT, GST_SRT_DEFAULT_LATENCY,
200       "mode", GST_TYPE_SRT_CONNECTION_MODE, GST_SRT_DEFAULT_MODE, NULL);
201 
202   srtobject->sock = SRT_INVALID_SOCK;
203   srtobject->poll_id = srt_epoll_create ();
204   srtobject->listener_sock = SRT_INVALID_SOCK;
205   srtobject->listener_poll_id = SRT_ERROR;
206   srtobject->sent_headers = FALSE;
207 
208   g_cond_init (&srtobject->sock_cond);
209   return srtobject;
210 }
211 
212 void
gst_srt_object_destroy(GstSRTObject * srtobject)213 gst_srt_object_destroy (GstSRTObject * srtobject)
214 {
215   g_return_if_fail (srtobject != NULL);
216 
217   if (srtobject->poll_id != SRT_ERROR) {
218     srt_epoll_release (srtobject->poll_id);
219     srtobject->poll_id = SRT_ERROR;
220   }
221 
222   g_cond_clear (&srtobject->sock_cond);
223 
224   GST_DEBUG_OBJECT (srtobject->element, "Destroying srtobject");
225   gst_structure_free (srtobject->parameters);
226 
227   g_free (srtobject->passphrase);
228 
229   if (g_atomic_int_dec_and_test (&srt_init_refcount)) {
230     srt_cleanup ();
231     GST_DEBUG_OBJECT (srtobject->element, "Cleaning up SRT");
232   }
233 
234   g_clear_pointer (&srtobject->uri, gst_uri_unref);
235 
236   g_free (srtobject);
237 }
238 
239 gboolean
gst_srt_object_set_property_helper(GstSRTObject * srtobject,guint prop_id,const GValue * value,GParamSpec * pspec)240 gst_srt_object_set_property_helper (GstSRTObject * srtobject,
241     guint prop_id, const GValue * value, GParamSpec * pspec)
242 {
243   switch (prop_id) {
244     case PROP_URI:{
245       const gchar *uri = g_value_get_string (value);
246       gst_srt_object_set_uri (srtobject, uri, NULL);
247       break;
248     }
249     case PROP_MODE:
250       gst_structure_set_value (srtobject->parameters, "mode", value);
251       break;
252     case PROP_POLL_TIMEOUT:
253       gst_structure_set_value (srtobject->parameters, "poll-timeout", value);
254       break;
255     case PROP_LATENCY:
256       gst_structure_set_value (srtobject->parameters, "latency", value);
257       break;
258     case PROP_LOCALADDRESS:
259       gst_structure_set_value (srtobject->parameters, "localaddress", value);
260       break;
261     case PROP_LOCALPORT:
262       gst_structure_set_value (srtobject->parameters, "localport", value);
263       break;
264     case PROP_PASSPHRASE:
265       g_free (srtobject->passphrase);
266       srtobject->passphrase = g_value_dup_string (value);
267       break;
268     case PROP_PBKEYLEN:
269       gst_structure_set_value (srtobject->parameters, "pbkeylen", value);
270       break;
271     default:
272       return FALSE;
273   }
274   return TRUE;
275 }
276 
277 gboolean
gst_srt_object_get_property_helper(GstSRTObject * srtobject,guint prop_id,GValue * value,GParamSpec * pspec)278 gst_srt_object_get_property_helper (GstSRTObject * srtobject,
279     guint prop_id, GValue * value, GParamSpec * pspec)
280 {
281   switch (prop_id) {
282     case PROP_URI:
283       g_value_take_string (value, gst_uri_to_string (srtobject->uri));
284       break;
285     case PROP_MODE:{
286       GstSRTConnectionMode v;
287       if (!gst_structure_get_enum (srtobject->parameters, "mode",
288               GST_TYPE_SRT_CONNECTION_MODE, (gint *) & v)) {
289         GST_WARNING_OBJECT (srtobject->element, "Failed to get 'mode'");
290         v = GST_SRT_CONNECTION_MODE_NONE;
291       }
292       g_value_set_enum (value, v);
293       break;
294     }
295     case PROP_LOCALADDRESS:
296       g_value_set_string (value,
297           gst_structure_get_string (srtobject->parameters, "localaddress"));
298       break;
299     case PROP_LOCALPORT:{
300       guint v;
301       if (!gst_structure_get_uint (srtobject->parameters, "localport", &v)) {
302         GST_WARNING_OBJECT (srtobject->element, "Failed to get 'localport'");
303         v = GST_SRT_DEFAULT_PORT;
304       }
305       g_value_set_uint (value, v);
306       break;
307     }
308     case PROP_PBKEYLEN:{
309       GstSRTKeyLength v;
310       if (!gst_structure_get_enum (srtobject->parameters, "pbkeylen",
311               GST_TYPE_SRT_KEY_LENGTH, (gint *) & v)) {
312         GST_WARNING_OBJECT (srtobject->element, "Failed to get 'pbkeylen'");
313         v = GST_SRT_KEY_LENGTH_NO_KEY;
314       }
315       g_value_set_enum (value, v);
316       break;
317     }
318     case PROP_POLL_TIMEOUT:{
319       gint v;
320       if (!gst_structure_get_int (srtobject->parameters, "poll-timeout", &v)) {
321         GST_WARNING_OBJECT (srtobject->element, "Failed to get 'poll-timeout'");
322         v = GST_SRT_DEFAULT_POLL_TIMEOUT;
323       }
324       g_value_set_int (value, v);
325       break;
326     }
327     case PROP_LATENCY:{
328       gint v;
329       if (!gst_structure_get_int (srtobject->parameters, "latency", &v)) {
330         GST_WARNING_OBJECT (srtobject->element, "Failed to get 'latency'");
331         v = GST_SRT_DEFAULT_LATENCY;
332       }
333       g_value_set_int (value, v);
334       break;
335     }
336     case PROP_STATS:
337       g_value_take_boxed (value, gst_srt_object_get_stats (srtobject));
338       break;
339     default:
340       return FALSE;
341   }
342 
343   return TRUE;
344 }
345 
346 void
gst_srt_object_install_properties_helper(GObjectClass * gobject_class)347 gst_srt_object_install_properties_helper (GObjectClass * gobject_class)
348 {
349   /**
350    * GstSRTSrc:uri:
351    *
352    * The URI used by SRT connection. User can specify SRT specific options by URI parameters.
353    * Refer to <a href="https://github.com/Haivision/srt/blob/master/docs/stransmit.md#medium-srt">Mediun: SRT</a>
354    */
355   g_object_class_install_property (gobject_class, PROP_URI,
356       g_param_spec_string ("uri", "URI",
357           "URI in the form of srt://address:port", GST_SRT_DEFAULT_URI,
358           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
359           G_PARAM_STATIC_STRINGS));
360 
361   /**
362    * GstSRTSrc:mode:
363    *
364    * The SRT connection mode.
365    * This property can be set by URI parameters.
366    */
367   g_object_class_install_property (gobject_class, PROP_MODE,
368       g_param_spec_enum ("mode", "Connection mode",
369           "SRT connection mode", GST_TYPE_SRT_CONNECTION_MODE,
370           GST_SRT_CONNECTION_MODE_CALLER,
371           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
372           G_PARAM_STATIC_STRINGS));
373 
374   /**
375    * GstSRTSrc:localaddress:
376    *
377    * The address to bind when #GstSRTSrc:mode is listener or rendezvous.
378    * This property can be set by URI parameters.
379    */
380   g_object_class_install_property (gobject_class, PROP_LOCALADDRESS,
381       g_param_spec_string ("localaddress", "Local address",
382           "Local address to bind", GST_SRT_DEFAULT_LOCALADDRESS,
383           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
384           G_PARAM_STATIC_STRINGS));
385 
386   /**
387    * GstSRTSrc:localport:
388    *
389    * The local port to bind when #GstSRTSrc:mode is listener or rendezvous.
390    * This property can be set by URI parameters.
391    */
392   g_object_class_install_property (gobject_class, PROP_LOCALPORT,
393       g_param_spec_uint ("localport", "Local port",
394           "Local port to bind", 0,
395           65535, GST_SRT_DEFAULT_PORT,
396           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
397           G_PARAM_STATIC_STRINGS));
398 
399   /**
400    * GstSRTSrc:passphrase:
401    *
402    * The password for the encrypted transmission.
403    * This property can be set by URI parameters.
404    */
405   g_object_class_install_property (gobject_class, PROP_PASSPHRASE,
406       g_param_spec_string ("passphrase", "Passphrase",
407           "Password for the encrypted transmission", "",
408           G_PARAM_WRITABLE | GST_PARAM_MUTABLE_READY | G_PARAM_STATIC_STRINGS));
409 
410   /**
411    * GstSRTSrc:pbkeylen:
412    *
413    * The crypto key length.
414    * This property can be set by URI parameters.
415    */
416   g_object_class_install_property (gobject_class, PROP_PBKEYLEN,
417       g_param_spec_enum ("pbkeylen", "Crypto key length",
418           "Crypto key length in bytes", GST_TYPE_SRT_KEY_LENGTH,
419           GST_SRT_DEFAULT_PBKEYLEN,
420           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
421           G_PARAM_STATIC_STRINGS));
422 
423   /**
424    * GstSRTSrc:poll-timeout:
425    *
426    * The polling timeout used when srt poll is started.
427    * Even if the default value indicates infinite waiting, it can be cancellable according to #GstState
428    * This property can be set by URI parameters.
429    */
430   g_object_class_install_property (gobject_class, PROP_POLL_TIMEOUT,
431       g_param_spec_int ("poll-timeout", "Poll timeout",
432           "Return poll wait after timeout miliseconds (-1 = infinite)", -1,
433           G_MAXINT32, GST_SRT_DEFAULT_POLL_TIMEOUT,
434           G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
435           G_PARAM_STATIC_STRINGS));
436 
437   /**
438    * GstSRTSrc:latency:
439    *
440    * The maximum accepted transmission latency.
441    */
442   g_object_class_install_property (gobject_class, PROP_LATENCY,
443       g_param_spec_int ("latency", "latency",
444           "Minimum latency (milliseconds)", 0,
445           G_MAXINT32, GST_SRT_DEFAULT_LATENCY,
446           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
447 
448   /**
449    * GstSRTSrc:stats:
450    *
451    * The statistics from SRT.
452    */
453   g_object_class_install_property (gobject_class, PROP_STATS,
454       g_param_spec_boxed ("stats", "Statistics",
455           "SRT Statistics", GST_TYPE_STRUCTURE,
456           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
457 
458 }
459 
460 static void
gst_srt_object_set_enum_value(GstStructure * s,GType enum_type,gconstpointer key,gconstpointer value)461 gst_srt_object_set_enum_value (GstStructure * s, GType enum_type,
462     gconstpointer key, gconstpointer value)
463 {
464   GEnumClass *enum_class;
465   GEnumValue *enum_value;
466 
467   enum_class = g_type_class_ref (enum_type);
468   enum_value = g_enum_get_value_by_nick (enum_class, value);
469 
470   if (enum_value) {
471     GValue v = G_VALUE_INIT;
472     g_value_init (&v, enum_type);
473     g_value_set_enum (&v, enum_value->value);
474     gst_structure_set_value (s, key, &v);
475   }
476 
477   g_type_class_unref (enum_class);
478 }
479 
480 static void
gst_srt_object_set_string_value(GstStructure * s,const gchar * key,const gchar * value)481 gst_srt_object_set_string_value (GstStructure * s, const gchar * key,
482     const gchar * value)
483 {
484   GValue v = G_VALUE_INIT;
485   g_value_init (&v, G_TYPE_STRING);
486   g_value_set_static_string (&v, value);
487   gst_structure_set_value (s, key, &v);
488   g_value_unset (&v);
489 }
490 
491 static void
gst_srt_object_set_uint_value(GstStructure * s,const gchar * key,const gchar * value)492 gst_srt_object_set_uint_value (GstStructure * s, const gchar * key,
493     const gchar * value)
494 {
495   GValue v = G_VALUE_INIT;
496   g_value_init (&v, G_TYPE_UINT);
497   g_value_set_uint (&v, (guint) strtoul (value, NULL, 10));
498   gst_structure_set_value (s, key, &v);
499   g_value_unset (&v);
500 }
501 
502 static void
gst_srt_object_validate_parameters(GstStructure * s,GstUri * uri)503 gst_srt_object_validate_parameters (GstStructure * s, GstUri * uri)
504 {
505   GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
506 
507   gst_structure_get_enum (s, "mode", GST_TYPE_SRT_CONNECTION_MODE,
508       (gint *) & connection_mode);
509 
510   if (connection_mode == GST_SRT_CONNECTION_MODE_RENDEZVOUS ||
511       connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
512     guint local_port;
513     const gchar *local_address = gst_structure_get_string (s, "localaddress");
514 
515     if (local_address == NULL) {
516       local_address =
517           gst_uri_get_host (uri) ==
518           NULL ? GST_SRT_DEFAULT_LOCALADDRESS : gst_uri_get_host (uri);
519       gst_srt_object_set_string_value (s, "localaddress", local_address);
520     }
521 
522     if (!gst_structure_get_uint (s, "localport", &local_port)) {
523       local_port =
524           gst_uri_get_port (uri) ==
525           GST_URI_NO_PORT ? GST_SRT_DEFAULT_PORT : gst_uri_get_port (uri);
526       gst_structure_set (s, "localport", G_TYPE_UINT, local_port, NULL);
527     }
528   }
529 }
530 
531 gboolean
gst_srt_object_set_uri(GstSRTObject * srtobject,const gchar * uri,GError ** err)532 gst_srt_object_set_uri (GstSRTObject * srtobject, const gchar * uri,
533     GError ** err)
534 {
535   GHashTable *query_table = NULL;
536   GHashTableIter iter;
537   gpointer key, value;
538   const char *addr_str;
539 
540   if (srtobject->opened) {
541     g_warning
542         ("It's not supported to change the 'uri' property when SRT socket is opened.");
543     g_set_error (err, GST_URI_ERROR, GST_URI_ERROR_BAD_STATE,
544         "It's not supported to change the 'uri' property when SRT socket is opened");
545 
546     return FALSE;
547   }
548 
549   if (!g_str_has_prefix (uri, GST_SRT_DEFAULT_URI_SCHEME)) {
550     g_warning ("Given uri cannot be used for SRT connection.");
551     g_set_error (err, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
552         "Invalid SRT URI scheme");
553     return FALSE;
554   }
555 
556   g_clear_pointer (&srtobject->uri, gst_uri_unref);
557   srtobject->uri = gst_uri_from_string (uri);
558 
559   query_table = gst_uri_get_query_table (srtobject->uri);
560 
561   GST_DEBUG_OBJECT (srtobject->element,
562       "set uri to (host: %s, port: %d) with %d query strings",
563       gst_uri_get_host (srtobject->uri), gst_uri_get_port (srtobject->uri),
564       query_table == NULL ? 0 : g_hash_table_size (query_table));
565 
566   addr_str = gst_uri_get_host (srtobject->uri);
567   if (addr_str)
568     gst_srt_object_set_enum_value (srtobject->parameters,
569         GST_TYPE_SRT_CONNECTION_MODE, "mode", "caller");
570   else
571     gst_srt_object_set_enum_value (srtobject->parameters,
572         GST_TYPE_SRT_CONNECTION_MODE, "mode", "listener");
573 
574   if (query_table) {
575     g_hash_table_iter_init (&iter, query_table);
576     while (g_hash_table_iter_next (&iter, &key, &value)) {
577       if (!g_strcmp0 ("mode", key)) {
578         gst_srt_object_set_enum_value (srtobject->parameters,
579             GST_TYPE_SRT_CONNECTION_MODE, key, value);
580       } else if (!g_strcmp0 ("localaddress", key)) {
581         gst_srt_object_set_string_value (srtobject->parameters, key, value);
582       } else if (!g_strcmp0 ("localport", key)) {
583         gst_srt_object_set_uint_value (srtobject->parameters, key, value);
584       } else if (!g_strcmp0 ("passphrase", key)) {
585         g_free (srtobject->passphrase);
586         srtobject->passphrase = g_strdup (value);
587       } else if (!g_strcmp0 ("pbkeylen", key)) {
588         gst_srt_object_set_enum_value (srtobject->parameters,
589             GST_TYPE_SRT_KEY_LENGTH, key, value);
590       }
591     }
592 
593     g_hash_table_unref (query_table);
594   }
595 
596   gst_srt_object_validate_parameters (srtobject->parameters, srtobject->uri);
597 
598   return TRUE;
599 }
600 
601 static gpointer
thread_func(gpointer data)602 thread_func (gpointer data)
603 {
604   GstSRTObject *srtobject = data;
605   SRTSOCKET caller_sock;
606   union
607   {
608     struct sockaddr_storage ss;
609     struct sockaddr sa;
610   } caller_sa;
611   int caller_sa_len;
612 
613   gint poll_timeout;
614 
615   SRTSOCKET rsock;
616   gint rsocklen = 1;
617 
618   for (;;) {
619     if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
620             &poll_timeout)) {
621       poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
622     }
623 
624     GST_DEBUG_OBJECT (srtobject->element, "Waiting a request from caller");
625 
626     if (srt_epoll_wait (srtobject->listener_poll_id, &rsock,
627             &rsocklen, 0, 0, poll_timeout, NULL, 0, NULL, 0) < 0) {
628       gint srt_errno = srt_getlasterror (NULL);
629 
630       if (srtobject->listener_poll_id == SRT_ERROR)
631         return NULL;
632       if (srt_errno == SRT_ETIMEOUT) {
633         continue;
634       } else {
635         GST_ELEMENT_ERROR (srtobject->element, RESOURCE, FAILED,
636             ("abort polling: %s", srt_getlasterror_str ()), (NULL));
637         return NULL;
638       }
639     }
640 
641     caller_sock =
642         srt_accept (srtobject->listener_sock, &caller_sa.sa, &caller_sa_len);
643 
644     if (caller_sock != SRT_INVALID_SOCK) {
645       SRTCaller *caller;
646       gint flag = SRT_EPOLL_ERR;
647 
648       caller = srt_caller_new ();
649       caller->sockaddr =
650           g_socket_address_new_from_native (&caller_sa.sa, caller_sa_len);
651       caller->poll_id = srt_epoll_create ();
652       caller->sock = caller_sock;
653 
654       if (gst_uri_handler_get_uri_type (GST_URI_HANDLER
655               (srtobject->element)) == GST_URI_SRC) {
656         flag |= SRT_EPOLL_IN;
657       } else {
658         flag |= SRT_EPOLL_OUT;
659       }
660 
661       if (srt_epoll_add_usock (caller->poll_id, caller_sock, &flag)) {
662 
663         GST_ELEMENT_ERROR (srtobject->element, RESOURCE, SETTINGS,
664             ("%s", srt_getlasterror_str ()), (NULL));
665 
666         srt_caller_free (caller);
667 
668         /* try-again */
669         continue;
670       }
671 
672       GST_OBJECT_LOCK (srtobject->element);
673       srtobject->callers = g_list_append (srtobject->callers, caller);
674       g_cond_signal (&srtobject->sock_cond);
675       GST_OBJECT_UNLOCK (srtobject->element);
676 
677       /* notifying caller-added */
678       if (srtobject->caller_added_closure != NULL) {
679         GValue values[2] = { G_VALUE_INIT, G_VALUE_INIT };
680 
681         g_value_init (&values[0], G_TYPE_INT);
682         g_value_set_int (&values[0], caller->sock);
683 
684         g_value_init (&values[1], G_TYPE_SOCKET_ADDRESS);
685         g_value_set_object (&values[1], caller->sockaddr);
686 
687         g_closure_invoke (srtobject->caller_added_closure, NULL, 2, values,
688             NULL);
689 
690         g_value_unset (&values[1]);
691       }
692 
693       GST_DEBUG_OBJECT (srtobject->element, "Accept to connect");
694 
695       if (gst_uri_handler_get_uri_type (GST_URI_HANDLER (srtobject->element)) ==
696           GST_URI_SRC)
697         return NULL;
698     }
699   }
700 }
701 
702 static gboolean
gst_srt_object_wait_connect(GstSRTObject * srtobject,GCancellable * cancellable,gpointer sa,size_t sa_len,GError ** error)703 gst_srt_object_wait_connect (GstSRTObject * srtobject,
704     GCancellable * cancellable, gpointer sa, size_t sa_len, GError ** error)
705 {
706   SRTSOCKET sock = SRT_INVALID_SOCK;
707   const gchar *local_address = NULL;
708   guint local_port = 0;
709   gint sock_flags = SRT_EPOLL_ERR | SRT_EPOLL_IN;
710 
711   gpointer bind_sa;
712   gsize bind_sa_len;
713   GSocketAddress *bind_addr;
714 
715   gst_structure_get_uint (srtobject->parameters, "localport", &local_port);
716 
717   local_address =
718       gst_structure_get_string (srtobject->parameters, "localaddress");
719   if (local_address == NULL)
720     local_address = GST_SRT_DEFAULT_LOCALADDRESS;
721 
722   bind_addr = g_inet_socket_address_new_from_string (local_address, local_port);
723   bind_sa_len = g_socket_address_get_native_size (bind_addr);
724   bind_sa = g_alloca (bind_sa_len);
725 
726   if (!g_socket_address_to_native (bind_addr, bind_sa, bind_sa_len, error)) {
727     goto failed;
728   }
729 
730   g_clear_object (&bind_addr);
731 
732   sock = srt_socket (AF_INET, SOCK_DGRAM, 0);
733   if (sock == SRT_INVALID_SOCK) {
734     g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_INIT, "%s",
735         srt_getlasterror_str ());
736     goto failed;
737   }
738 
739   if (!gst_srt_object_set_common_params (sock, srtobject, error)) {
740     goto failed;
741   }
742 
743   GST_DEBUG_OBJECT (srtobject->element, "Binding to %s (port: %d)",
744       local_address, local_port);
745 
746   if (srt_bind (sock, bind_sa, bind_sa_len) == SRT_ERROR) {
747     g_set_error (error, GST_RESOURCE_ERROR,
748         GST_RESOURCE_ERROR_OPEN_READ_WRITE, "Cannot bind to %s:%d - %s",
749         local_address, local_port, srt_getlasterror_str ());
750     goto failed;
751   }
752 
753   if (srt_epoll_add_usock (srtobject->listener_poll_id, sock, &sock_flags)) {
754     g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "%s",
755         srt_getlasterror_str ());
756     goto failed;
757   }
758 
759   GST_DEBUG_OBJECT (srtobject->element, "Starting to listen on bind socket");
760   if (srt_listen (sock, 1) == SRT_ERROR) {
761     g_set_error (error, GST_RESOURCE_ERROR,
762         GST_RESOURCE_ERROR_OPEN_READ_WRITE, "Cannot listen on bind socket: %s",
763         srt_getlasterror_str ());
764 
765     goto failed;
766   }
767 
768   srtobject->listener_sock = sock;
769 
770   srtobject->thread =
771       g_thread_try_new ("GstSRTObjectListener", thread_func, srtobject, error);
772 
773   if (*error != NULL) {
774     goto failed;
775   }
776 
777   return TRUE;
778 
779 failed:
780 
781   if (srtobject->listener_poll_id != SRT_ERROR) {
782     srt_epoll_release (srtobject->listener_poll_id);
783   }
784 
785   if (sock != SRT_INVALID_SOCK) {
786     srt_close (sock);
787   }
788 
789   g_clear_object (&bind_addr);
790 
791   srtobject->listener_poll_id = SRT_ERROR;
792   srtobject->listener_sock = SRT_INVALID_SOCK;
793 
794   return FALSE;
795 }
796 
797 static gboolean
gst_srt_object_connect(GstSRTObject * srtobject,GstSRTConnectionMode connection_mode,gpointer sa,size_t sa_len,GError ** error)798 gst_srt_object_connect (GstSRTObject * srtobject,
799     GstSRTConnectionMode connection_mode, gpointer sa, size_t sa_len,
800     GError ** error)
801 {
802   SRTSOCKET sock;
803   gint option_val = -1;
804   gint sock_flags = SRT_EPOLL_ERR;
805   guint local_port = 0;
806   const gchar *local_address = NULL;
807 
808   sock = srt_socket (AF_INET, SOCK_DGRAM, 0);
809   if (sock == SRT_INVALID_SOCK) {
810     g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_INIT, "%s",
811         srt_getlasterror_str ());
812     goto failed;
813   }
814 
815   if (!gst_srt_object_set_common_params (sock, srtobject, error)) {
816     goto failed;
817   }
818 
819   switch (gst_uri_handler_get_uri_type (GST_URI_HANDLER (srtobject->element))) {
820     case GST_URI_SRC:
821       option_val = 0;
822       sock_flags |= SRT_EPOLL_IN;
823       break;
824     case GST_URI_SINK:
825       option_val = 1;
826       sock_flags |= SRT_EPOLL_OUT;
827       break;
828     default:
829       g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
830           "Cannot determine stream direction");
831       goto failed;
832   }
833 
834   if (srt_setsockopt (sock, 0, SRTO_SENDER, &option_val, sizeof (gint))) {
835     g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "%s",
836         srt_getlasterror_str ());
837     goto failed;
838   }
839 
840   option_val = (connection_mode == GST_SRT_CONNECTION_MODE_RENDEZVOUS);
841   if (srt_setsockopt (sock, 0, SRTO_RENDEZVOUS, &option_val, sizeof (gint))) {
842     g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "%s",
843         srt_getlasterror_str ());
844     goto failed;
845   }
846 
847   gst_structure_get_uint (srtobject->parameters, "localport", &local_port);
848   local_address =
849       gst_structure_get_string (srtobject->parameters, "localaddress");
850   /* According to SRT norm, bind local address and port if specified */
851   if (local_address != NULL && local_port != 0) {
852     gpointer bind_sa;
853     gsize bind_sa_len;
854 
855     GSocketAddress *bind_addr =
856         g_inet_socket_address_new_from_string (local_address,
857         local_port);
858 
859     bind_sa_len = g_socket_address_get_native_size (bind_addr);
860     bind_sa = g_alloca (bind_sa_len);
861 
862     if (!g_socket_address_to_native (bind_addr, bind_sa, bind_sa_len, error)) {
863       g_clear_object (&bind_addr);
864       goto failed;
865     }
866 
867     g_clear_object (&bind_addr);
868 
869     GST_DEBUG_OBJECT (srtobject->element, "Binding to %s (port: %d)",
870         local_address, local_port);
871 
872     if (srt_bind (sock, bind_sa, bind_sa_len) == SRT_ERROR) {
873       g_set_error (error, GST_RESOURCE_ERROR,
874           GST_RESOURCE_ERROR_OPEN_READ_WRITE, "Cannot bind to %s:%d - %s",
875           local_address, local_port, srt_getlasterror_str ());
876       goto failed;
877     }
878   }
879 
880   if (srt_epoll_add_usock (srtobject->poll_id, sock, &sock_flags)) {
881     g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "%s",
882         srt_getlasterror_str ());
883     goto failed;
884   }
885 
886   if (srt_connect (sock, sa, sa_len) == SRT_ERROR) {
887     g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_OPEN_READ, "%s",
888         srt_getlasterror_str ());
889     goto failed;
890   }
891 
892   srtobject->sock = sock;
893 
894   return TRUE;
895 
896 failed:
897 
898   if (srtobject->poll_id != SRT_ERROR) {
899     srt_epoll_release (srtobject->poll_id);
900   }
901 
902   if (sock != SRT_INVALID_SOCK) {
903     srt_close (sock);
904   }
905 
906   srtobject->poll_id = SRT_ERROR;
907   srtobject->sock = SRT_INVALID_SOCK;
908 
909   return FALSE;
910 }
911 
912 static gboolean
gst_srt_object_open_connection(GstSRTObject * srtobject,GCancellable * cancellable,GstSRTConnectionMode connection_mode,gpointer sa,size_t sa_len,GError ** error)913 gst_srt_object_open_connection (GstSRTObject * srtobject,
914     GCancellable * cancellable, GstSRTConnectionMode connection_mode,
915     gpointer sa, size_t sa_len, GError ** error)
916 {
917   gboolean ret = FALSE;
918 
919   if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
920     ret =
921         gst_srt_object_wait_connect (srtobject, cancellable, sa, sa_len, error);
922   } else {
923     ret =
924         gst_srt_object_connect (srtobject, connection_mode, sa, sa_len, error);
925   }
926 
927   return ret;
928 }
929 
930 gboolean
gst_srt_object_open(GstSRTObject * srtobject,GCancellable * cancellable,GError ** error)931 gst_srt_object_open (GstSRTObject * srtobject, GCancellable * cancellable,
932     GError ** error)
933 {
934   return gst_srt_object_open_full (srtobject, NULL, NULL, cancellable, error);
935 }
936 
937 gboolean
gst_srt_object_open_full(GstSRTObject * srtobject,GstSRTObjectCallerAdded caller_added_func,GstSRTObjectCallerRemoved caller_removed_func,GCancellable * cancellable,GError ** error)938 gst_srt_object_open_full (GstSRTObject * srtobject,
939     GstSRTObjectCallerAdded caller_added_func,
940     GstSRTObjectCallerRemoved caller_removed_func,
941     GCancellable * cancellable, GError ** error)
942 {
943   GSocketAddress *socket_address = NULL;
944   GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
945 
946   gpointer sa;
947   size_t sa_len;
948   const gchar *addr_str;
949 
950   srtobject->opened = FALSE;
951 
952   if (caller_added_func != NULL) {
953     srtobject->caller_added_closure =
954         g_cclosure_new (G_CALLBACK (caller_added_func), srtobject, NULL);
955     g_closure_set_marshal (srtobject->caller_added_closure,
956         g_cclosure_marshal_generic);
957   }
958 
959   if (caller_removed_func != NULL) {
960     srtobject->caller_removed_closure =
961         g_cclosure_new (G_CALLBACK (caller_removed_func), srtobject, NULL);
962     g_closure_set_marshal (srtobject->caller_removed_closure,
963         g_cclosure_marshal_generic);
964   }
965 
966   addr_str = gst_uri_get_host (srtobject->uri);
967 
968   if (addr_str == NULL) {
969     addr_str = GST_SRT_DEFAULT_LOCALADDRESS;
970     GST_DEBUG_OBJECT (srtobject->element,
971         "Given uri doesn't have hostname or address. Use any (%s) and"
972         " setting listener mode", addr_str);
973   }
974 
975   socket_address =
976       g_inet_socket_address_new_from_string (addr_str,
977       gst_uri_get_port (srtobject->uri));
978 
979   if (socket_address == NULL) {
980     g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_OPEN_READ,
981         "Invalid host");
982     goto out;
983   }
984 
985   /* FIXME: Unfortunately, SRT doesn't support IPv6 currently. */
986   if (g_socket_address_get_family (socket_address) != G_SOCKET_FAMILY_IPV4) {
987     g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_OPEN_READ,
988         "SRT supports IPv4 only");
989     goto out;
990   }
991 
992   sa_len = g_socket_address_get_native_size (socket_address);
993   sa = g_alloca (sa_len);
994 
995   if (!g_socket_address_to_native (socket_address, sa, sa_len, error)) {
996     goto out;
997   }
998 
999   GST_DEBUG_OBJECT (srtobject->element,
1000       "Opening SRT socket with parameters: %" GST_PTR_FORMAT,
1001       srtobject->parameters);
1002 
1003   if (!gst_structure_get_enum (srtobject->parameters,
1004           "mode", GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode)) {
1005     GST_WARNING_OBJECT (srtobject->element,
1006         "Cannot get connection mode information." " Use default mode");
1007     connection_mode = GST_TYPE_SRT_CONNECTION_MODE;
1008   }
1009 
1010   srtobject->listener_poll_id = srt_epoll_create ();
1011 
1012   srtobject->opened =
1013       gst_srt_object_open_connection
1014       (srtobject, cancellable, connection_mode, sa, sa_len, error);
1015 
1016 out:
1017   g_clear_object (&socket_address);
1018 
1019   return srtobject->opened;
1020 }
1021 
1022 void
gst_srt_object_close(GstSRTObject * srtobject)1023 gst_srt_object_close (GstSRTObject * srtobject)
1024 {
1025   GST_OBJECT_LOCK (srtobject->element);
1026   if (srtobject->poll_id != SRT_ERROR) {
1027     srt_epoll_remove_usock (srtobject->poll_id, srtobject->sock);
1028   }
1029 
1030   if (srtobject->sock != SRT_INVALID_SOCK) {
1031 
1032     GST_DEBUG_OBJECT (srtobject->element, "Closing SRT socket (0x%x)",
1033         srtobject->sock);
1034 
1035     srt_close (srtobject->sock);
1036     srtobject->sock = SRT_INVALID_SOCK;
1037   }
1038 
1039   if (srtobject->listener_poll_id != SRT_ERROR) {
1040     srt_epoll_remove_usock (srtobject->listener_poll_id,
1041         srtobject->listener_sock);
1042     srtobject->listener_poll_id = SRT_ERROR;
1043   }
1044   if (srtobject->thread) {
1045     GThread *thread = g_steal_pointer (&srtobject->thread);
1046     GST_OBJECT_UNLOCK (srtobject->element);
1047     g_thread_join (thread);
1048     GST_OBJECT_LOCK (srtobject->element);
1049   }
1050 
1051   if (srtobject->listener_sock != SRT_INVALID_SOCK) {
1052     GST_DEBUG_OBJECT (srtobject->element, "Closing SRT listener socket (0x%x)",
1053         srtobject->listener_sock);
1054 
1055     srt_close (srtobject->listener_sock);
1056     srtobject->listener_sock = SRT_INVALID_SOCK;
1057   }
1058 
1059   if (srtobject->callers) {
1060     GList *callers = g_steal_pointer (&srtobject->callers);
1061     GST_OBJECT_UNLOCK (srtobject->element);
1062     g_list_foreach (callers, (GFunc) srt_caller_invoke_removed_closure,
1063         srtobject);
1064     GST_OBJECT_LOCK (srtobject->element);
1065     g_list_free_full (callers, (GDestroyNotify) srt_caller_free);
1066   }
1067 
1068   g_clear_pointer (&srtobject->caller_added_closure, g_closure_unref);
1069   g_clear_pointer (&srtobject->caller_removed_closure, g_closure_unref);
1070 
1071   srtobject->opened = FALSE;
1072   GST_OBJECT_UNLOCK (srtobject->element);
1073 }
1074 
1075 static gboolean
gst_srt_object_wait_caller(GstSRTObject * srtobject,GCancellable * cancellable,GError ** errorj)1076 gst_srt_object_wait_caller (GstSRTObject * srtobject,
1077     GCancellable * cancellable, GError ** errorj)
1078 {
1079   gboolean ret = FALSE;
1080 
1081   GST_DEBUG_OBJECT (srtobject->element, "Waiting connection from caller");
1082 
1083   GST_OBJECT_LOCK (srtobject->element);
1084   while (!g_cancellable_is_cancelled (cancellable)) {
1085     ret = (srtobject->callers != NULL);
1086     if (ret)
1087       break;
1088     g_cond_wait (&srtobject->sock_cond,
1089         GST_OBJECT_GET_LOCK (srtobject->element));
1090   }
1091   GST_OBJECT_UNLOCK (srtobject->element);
1092 
1093   GST_DEBUG_OBJECT (srtobject->element, "got %s connection", ret ? "a" : "no");
1094 
1095   return ret;
1096 }
1097 
1098 gssize
gst_srt_object_read(GstSRTObject * srtobject,guint8 * data,gsize size,GCancellable * cancellable,GError ** error)1099 gst_srt_object_read (GstSRTObject * srtobject,
1100     guint8 * data, gsize size, GCancellable * cancellable, GError ** error)
1101 {
1102   gssize len = 0;
1103   gint poll_timeout;
1104   GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
1105   gint poll_id = SRT_ERROR;
1106 
1107   /* Only source element can read data */
1108   g_return_val_if_fail (gst_uri_handler_get_uri_type (GST_URI_HANDLER
1109           (srtobject->element)) == GST_URI_SRC, -1);
1110 
1111   gst_structure_get_enum (srtobject->parameters, "mode",
1112       GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode);
1113 
1114   if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
1115     SRTCaller *caller;
1116 
1117     if (!gst_srt_object_wait_caller (srtobject, cancellable, error))
1118       return -1;
1119 
1120     GST_OBJECT_LOCK (srtobject->element);
1121     caller = srtobject->callers->data;
1122     if (srtobject->callers)
1123       poll_id = caller->poll_id;
1124     GST_OBJECT_UNLOCK (srtobject->element);
1125     if (poll_id == SRT_ERROR)
1126       return 0;
1127   } else {
1128     poll_id = srtobject->poll_id;
1129   }
1130 
1131   if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
1132           &poll_timeout)) {
1133     poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
1134   }
1135 
1136   while (!g_cancellable_is_cancelled (cancellable)) {
1137 
1138     SRTSOCKET rsock;
1139     gint rsocklen = 1;
1140     int pollret;
1141 
1142     pollret = srt_epoll_wait (poll_id, &rsock,
1143         &rsocklen, 0, 0, poll_timeout, NULL, 0, NULL, 0);
1144     if (pollret < 0) {
1145       gint srt_errno = srt_getlasterror (NULL);
1146 
1147       if (srt_errno != SRT_ETIMEOUT) {
1148         return 0;
1149       }
1150       continue;
1151     }
1152 
1153     if (rsocklen < 0) {
1154       GST_WARNING_OBJECT (srtobject->element,
1155           "abnormal SRT socket is detected");
1156       srt_close (rsock);
1157     }
1158 
1159     switch (srt_getsockstate (rsock)) {
1160       case SRTS_BROKEN:
1161       case SRTS_NONEXIST:
1162       case SRTS_CLOSED:
1163         if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
1164           /* Caller has been disappeared. */
1165           return 0;
1166         } else {
1167           GST_WARNING_OBJECT (srtobject->element,
1168               "Invalid SRT socket. Trying to reconnect");
1169           gst_srt_object_close (srtobject);
1170           if (!gst_srt_object_open (srtobject, cancellable, error)) {
1171             return -1;
1172           }
1173           continue;
1174         }
1175       case SRTS_CONNECTED:
1176         /* good to go */
1177         break;
1178       default:
1179         /* not-ready */
1180         continue;
1181     }
1182 
1183 
1184     len = srt_recvmsg (rsock, (char *) (data), size);
1185     break;
1186   }
1187 
1188   return len;
1189 }
1190 
1191 void
gst_srt_object_wakeup(GstSRTObject * srtobject,GCancellable * cancellable)1192 gst_srt_object_wakeup (GstSRTObject * srtobject, GCancellable * cancellable)
1193 {
1194   GST_DEBUG_OBJECT (srtobject->element, "waking up SRT");
1195 
1196   /* Removing all socket descriptors from the monitoring list
1197    * wakes up SRT's threads. We only have one to remove. */
1198   srt_epoll_remove_usock (srtobject->poll_id, srtobject->sock);
1199 
1200   /* connection is only waited for in listener mode,
1201    * but there is no harm in raising signal in any case */
1202   GST_OBJECT_LOCK (srtobject->element);
1203   /* however, a race might be harmful ...
1204    * the cancellation is used as 'flushing' flag here,
1205    * so make sure it is so detected by the intended part at proper time */
1206   g_cancellable_cancel (cancellable);
1207   g_cond_signal (&srtobject->sock_cond);
1208   GST_OBJECT_UNLOCK (srtobject->element);
1209 }
1210 
1211 static gboolean
gst_srt_object_send_headers(GstSRTObject * srtobject,SRTSOCKET sock,gint poll_id,gint poll_timeout,GstBufferList * headers,GCancellable * cancellable)1212 gst_srt_object_send_headers (GstSRTObject * srtobject, SRTSOCKET sock,
1213     gint poll_id, gint poll_timeout, GstBufferList * headers,
1214     GCancellable * cancellable)
1215 {
1216   guint size, i;
1217 
1218   if (!headers)
1219     return TRUE;
1220 
1221   size = gst_buffer_list_length (headers);
1222 
1223   GST_DEBUG_OBJECT (srtobject->element, "Sending %u stream headers", size);
1224 
1225   for (i = 0; i < size; i++) {
1226     SRTSOCKET wsock = sock;
1227     gint wsocklen = 1;
1228 
1229     GstBuffer *buffer = gst_buffer_list_get (headers, i);
1230     GstMapInfo mapinfo;
1231 
1232     if (g_cancellable_is_cancelled (cancellable)) {
1233       return FALSE;
1234     }
1235 
1236     if (poll_id > 0 && srt_epoll_wait (poll_id, 0, 0, &wsock,
1237             &wsocklen, poll_timeout, NULL, 0, NULL, 0) < 0) {
1238       continue;
1239     }
1240 
1241     GST_TRACE_OBJECT (srtobject->element, "sending header %u %" GST_PTR_FORMAT,
1242         i, buffer);
1243 
1244     if (!gst_buffer_map (buffer, &mapinfo, GST_MAP_READ)) {
1245       GST_ELEMENT_ERROR (srtobject->element, RESOURCE, READ,
1246           ("Could not map the input stream"), (NULL));
1247       return FALSE;
1248     }
1249 
1250     if (srt_sendmsg2 (wsock, (char *) mapinfo.data, mapinfo.size,
1251             0) == SRT_ERROR) {
1252       GST_ELEMENT_ERROR (srtobject->element, RESOURCE, WRITE, NULL,
1253           ("%s", srt_getlasterror_str ()));
1254       gst_buffer_unmap (buffer, &mapinfo);
1255       return FALSE;
1256     }
1257 
1258     gst_buffer_unmap (buffer, &mapinfo);
1259   }
1260 
1261   return TRUE;
1262 }
1263 
1264 static gssize
gst_srt_object_write_to_callers(GstSRTObject * srtobject,GstBufferList * headers,const GstMapInfo * mapinfo,GCancellable * cancellable,GError ** error)1265 gst_srt_object_write_to_callers (GstSRTObject * srtobject,
1266     GstBufferList * headers,
1267     const GstMapInfo * mapinfo, GCancellable * cancellable, GError ** error)
1268 {
1269   GList *callers;
1270 
1271   GST_OBJECT_LOCK (srtobject->element);
1272   callers = srtobject->callers;
1273   while (callers != NULL) {
1274     gssize len = 0;
1275     const guint8 *msg = mapinfo->data;
1276     gint sent;
1277 
1278     SRTCaller *caller = callers->data;
1279     callers = callers->next;
1280 
1281     if (g_cancellable_is_cancelled (cancellable)) {
1282       GST_OBJECT_UNLOCK (srtobject->element);
1283       return -1;
1284     }
1285 
1286     if (!caller->sent_headers) {
1287       if (!gst_srt_object_send_headers (srtobject, caller->sock, -1,
1288               -1, headers, cancellable)) {
1289         goto err;
1290       }
1291       caller->sent_headers = TRUE;
1292     }
1293 
1294     while (len < mapinfo->size) {
1295       gint rest = mapinfo->size - len;
1296       sent = srt_sendmsg2 (caller->sock, (char *) (msg + len), rest, 0);
1297       if (sent < 0) {
1298         goto err;
1299       }
1300       len += sent;
1301     }
1302 
1303     continue;
1304 
1305   err:
1306     srtobject->callers = g_list_remove (srtobject->callers, caller);
1307     srt_caller_invoke_removed_closure (caller, srtobject);
1308     srt_caller_free (caller);
1309   }
1310 
1311   GST_OBJECT_UNLOCK (srtobject->element);
1312 
1313   return mapinfo->size;
1314 }
1315 
1316 static gssize
gst_srt_object_write_one(GstSRTObject * srtobject,GstBufferList * headers,const GstMapInfo * mapinfo,GCancellable * cancellable,GError ** error)1317 gst_srt_object_write_one (GstSRTObject * srtobject,
1318     GstBufferList * headers,
1319     const GstMapInfo * mapinfo, GCancellable * cancellable, GError ** error)
1320 {
1321   gssize len = 0;
1322   gint poll_timeout;
1323   const guint8 *msg = mapinfo->data;
1324 
1325   if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
1326           &poll_timeout)) {
1327     poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
1328   }
1329 
1330   if (!srtobject->sent_headers) {
1331     if (!gst_srt_object_send_headers (srtobject, srtobject->sock,
1332             srtobject->poll_id, poll_timeout, headers, cancellable)) {
1333       return -1;
1334     }
1335     srtobject->sent_headers = TRUE;
1336   }
1337 
1338   while (len < mapinfo->size) {
1339     SRTSOCKET wsock;
1340     gint wsocklen = 1;
1341 
1342     gint sent;
1343     gint rest = mapinfo->size - len;
1344 
1345     if (g_cancellable_is_cancelled (cancellable)) {
1346       break;
1347     }
1348 
1349     if (srt_epoll_wait (srtobject->poll_id, 0, 0, &wsock,
1350             &wsocklen, poll_timeout, NULL, 0, NULL, 0) < 0) {
1351       continue;
1352     }
1353 
1354     switch (srt_getsockstate (wsock)) {
1355       case SRTS_BROKEN:
1356       case SRTS_NONEXIST:
1357       case SRTS_CLOSED:
1358         GST_WARNING_OBJECT (srtobject->element,
1359             "Invalid SRT socket. Trying to reconnect");
1360         gst_srt_object_close (srtobject);
1361         if (!gst_srt_object_open (srtobject, cancellable, error)) {
1362           return -1;
1363         }
1364         continue;
1365       case SRTS_CONNECTED:
1366         /* good to go */
1367         GST_LOG_OBJECT (srtobject->element, "good to go");
1368         break;
1369       default:
1370         GST_WARNING_OBJECT (srtobject->element, "not ready");
1371         /* not-ready */
1372         continue;
1373     }
1374 
1375     sent = srt_sendmsg2 (wsock, (char *) (msg + len), rest, 0);
1376     if (sent < 0) {
1377       GST_ELEMENT_ERROR (srtobject->element, RESOURCE, WRITE, NULL,
1378           ("%s", srt_getlasterror_str ()));
1379       break;
1380     }
1381     len += sent;
1382   }
1383 
1384   return len;
1385 }
1386 
1387 gssize
gst_srt_object_write(GstSRTObject * srtobject,GstBufferList * headers,const GstMapInfo * mapinfo,GCancellable * cancellable,GError ** error)1388 gst_srt_object_write (GstSRTObject * srtobject,
1389     GstBufferList * headers,
1390     const GstMapInfo * mapinfo, GCancellable * cancellable, GError ** error)
1391 {
1392   gssize len = 0;
1393   GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
1394 
1395   /* Only sink element can write data */
1396   g_return_val_if_fail (gst_uri_handler_get_uri_type (GST_URI_HANDLER
1397           (srtobject->element)) == GST_URI_SINK, -1);
1398 
1399   gst_structure_get_enum (srtobject->parameters, "mode",
1400       GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode);
1401 
1402   if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
1403     if (!gst_srt_object_wait_caller (srtobject, cancellable, error))
1404       return -1;
1405 
1406     len =
1407         gst_srt_object_write_to_callers (srtobject, headers, mapinfo,
1408         cancellable, error);
1409   } else {
1410     len =
1411         gst_srt_object_write_one (srtobject, headers, mapinfo, cancellable,
1412         error);
1413   }
1414 
1415   return len;
1416 }
1417 
1418 static GstStructure *
get_stats_for_srtsock(SRTSOCKET srtsock,gboolean is_sender)1419 get_stats_for_srtsock (SRTSOCKET srtsock, gboolean is_sender)
1420 {
1421   GstStructure *s = gst_structure_new_empty ("application/x-srt-statistics");
1422   int ret;
1423   SRT_TRACEBSTATS stats;
1424 
1425   ret = srt_bstats (srtsock, &stats, 0);
1426 
1427   if (ret >= 0) {
1428     if (is_sender)
1429       gst_structure_set (s,
1430           /* number of sent data packets, including retransmissions */
1431           "packets-sent", G_TYPE_INT64, stats.pktSent,
1432           /* number of lost packets (sender side) */
1433           "packets-sent-lost", G_TYPE_INT, stats.pktSndLoss,
1434           /* number of retransmitted packets */
1435           "packets-retransmitted", G_TYPE_INT, stats.pktRetrans,
1436           /* number of received ACK packets */
1437           "packet-ack-received", G_TYPE_INT, stats.pktRecvACK,
1438           /* number of received NAK packets */
1439           "packet-nack-received", G_TYPE_INT, stats.pktRecvNAK,
1440           /* time duration when UDT is sending data (idle time exclusive) */
1441           "send-duration-us", G_TYPE_INT64, stats.usSndDuration,
1442           /* number of sent data bytes, including retransmissions */
1443           "bytes-sent", G_TYPE_UINT64, stats.byteSent,
1444           /* number of retransmitted bytes */
1445           "bytes-retransmitted", G_TYPE_UINT64, stats.byteRetrans,
1446           /* number of too-late-to-send dropped bytes */
1447           "bytes-sent-dropped", G_TYPE_UINT64, stats.byteSndDrop,
1448           /* number of too-late-to-send dropped packets */
1449           "packets-sent-dropped", G_TYPE_INT, stats.pktSndDrop,
1450           /* sending rate in Mb/s */
1451           "send-rate-mbps", G_TYPE_DOUBLE, stats.mbpsSendRate,
1452           /* busy sending time (i.e., idle time exclusive) */
1453           "send-duration-us", G_TYPE_UINT64, stats.usSndDuration,
1454           "negotiated-latency-ms", G_TYPE_INT, stats.msSndTsbPdDelay, NULL);
1455     else
1456       gst_structure_set (s,
1457           "packets-received", G_TYPE_INT64, stats.pktRecvTotal,
1458           "packets-received-lost", G_TYPE_INT, stats.pktRcvLossTotal,
1459           /* number of sent ACK packets */
1460           "packet-ack-sent", G_TYPE_INT, stats.pktSentACK,
1461           /* number of sent NAK packets */
1462           "packet-nack-sent", G_TYPE_INT, stats.pktSentNAK,
1463           "bytes-received", G_TYPE_UINT64, stats.byteRecvTotal,
1464           "bytes-received-lost", G_TYPE_INT, stats.byteRcvLossTotal,
1465           "receive-rate-mbps", G_TYPE_DOUBLE, stats.mbpsRecvRate,
1466           "negotiated-latency-ms", G_TYPE_INT, stats.msRcvTsbPdDelay, NULL);
1467 
1468     gst_structure_set (s,
1469         /* estimated bandwidth, in Mb/s */
1470         "bandwidth-mbps", G_TYPE_DOUBLE, stats.mbpsBandwidth,
1471         "rtt-ms", G_TYPE_DOUBLE, stats.msRTT, NULL);
1472 
1473   }
1474 
1475   return s;
1476 }
1477 
1478 GstStructure *
gst_srt_object_get_stats(GstSRTObject * srtobject)1479 gst_srt_object_get_stats (GstSRTObject * srtobject)
1480 {
1481   GstStructure *s = NULL;
1482   gboolean is_sender = GST_IS_BASE_SINK (srtobject->element);
1483 
1484   GST_OBJECT_LOCK (srtobject->element);
1485   if (srtobject->sock != SRT_INVALID_SOCK) {
1486     s = get_stats_for_srtsock (srtobject->sock, is_sender);
1487     goto done;
1488   }
1489 
1490   s = gst_structure_new_empty ("application/x-srt-statistics");
1491 
1492   if (srtobject->callers) {
1493     GValueArray *callers_stats = g_value_array_new (1);
1494     GValue callers_stats_v = G_VALUE_INIT;
1495     GList *item;
1496 
1497     for (item = srtobject->callers; item; item = item->next) {
1498       SRTCaller *caller = item->data;
1499       GstStructure *tmp = get_stats_for_srtsock (caller->sock, is_sender);
1500       GValue *v;
1501 
1502       g_value_array_append (callers_stats, NULL);
1503       v = g_value_array_get_nth (callers_stats, callers_stats->n_values - 1);
1504       g_value_init (v, GST_TYPE_STRUCTURE);
1505       g_value_take_boxed (v, tmp);
1506     }
1507 
1508     g_value_init (&callers_stats_v, G_TYPE_VALUE_ARRAY);
1509     g_value_take_boxed (&callers_stats_v, callers_stats);
1510     gst_structure_take_value (s, "callers", &callers_stats_v);
1511   }
1512 
1513 done:
1514   GST_OBJECT_UNLOCK (srtobject->element);
1515 
1516   return s;
1517 }
1518