1 /* GStreamer
2 * Copyright (C) 2018, Collabora Ltd.
3 * Copyright (C) 2018, SK Telecom, Co., Ltd.
4 * Author: Jeongseok Kim <jeongseok.kim@sk.com>
5 *
6 * This library is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU Library General Public
8 * License as published by the Free Software Foundation; either
9 * version 2 of the License, or (at your option) any later version.
10 *
11 * This library is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * Library General Public License for more details.
15 *
16 * You should have received a copy of the GNU Library General Public
17 * License along with this library; if not, write to the
18 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
19 * Boston, MA 02110-1301, USA.
20 */
21
22 #ifdef HAVE_CONFIG_H
23 #include "config.h"
24 #endif
25
26 /* Needed for GValueArray */
27 #define GLIB_DISABLE_DEPRECATION_WARNINGS
28
29 #include "gstsrtobject.h"
30
31 #include <gst/base/gstbasesink.h>
32 #include <gio/gnetworking.h>
33 #include <stdlib.h>
34
35 GST_DEBUG_CATEGORY_EXTERN (gst_debug_srtobject);
36 #define GST_CAT_DEFAULT gst_debug_srtobject
37
38 enum
39 {
40 PROP_URI = 1,
41 PROP_MODE,
42 PROP_LOCALADDRESS,
43 PROP_LOCALPORT,
44 PROP_PASSPHRASE,
45 PROP_PBKEYLEN,
46 PROP_POLL_TIMEOUT,
47 PROP_LATENCY,
48 PROP_MSG_SIZE,
49 PROP_STATS,
50 PROP_LAST
51 };
52
53 typedef struct
54 {
55 SRTSOCKET sock;
56 gint poll_id;
57 GSocketAddress *sockaddr;
58 gboolean sent_headers;
59 } SRTCaller;
60
61 static SRTCaller *
srt_caller_new(void)62 srt_caller_new (void)
63 {
64 SRTCaller *caller = g_new0 (SRTCaller, 1);
65 caller->sock = SRT_INVALID_SOCK;
66 caller->poll_id = SRT_ERROR;
67 caller->sent_headers = FALSE;
68
69 return caller;
70 }
71
72 static void
srt_caller_free(SRTCaller * caller)73 srt_caller_free (SRTCaller * caller)
74 {
75 g_return_if_fail (caller != NULL);
76
77 g_clear_object (&caller->sockaddr);
78
79 if (caller->sock != SRT_INVALID_SOCK) {
80 srt_close (caller->sock);
81 }
82
83 if (caller->poll_id != SRT_ERROR) {
84 srt_epoll_release (caller->poll_id);
85 }
86
87 g_free (caller);
88 }
89
90 static void
srt_caller_invoke_removed_closure(SRTCaller * caller,GstSRTObject * srtobject)91 srt_caller_invoke_removed_closure (SRTCaller * caller, GstSRTObject * srtobject)
92 {
93 GValue values[2] = { G_VALUE_INIT };
94
95 if (srtobject->caller_removed_closure == NULL) {
96 return;
97 }
98
99 g_value_init (&values[0], G_TYPE_INT);
100 g_value_set_int (&values[0], caller->sock);
101
102 g_value_init (&values[1], G_TYPE_SOCKET_ADDRESS);
103 g_value_set_object (&values[1], caller->sockaddr);
104
105 g_closure_invoke (srtobject->caller_removed_closure, NULL, 2, values, NULL);
106
107 g_value_unset (&values[0]);
108 g_value_unset (&values[1]);
109 }
110
111 struct srt_constant_params
112 {
113 const gchar *name;
114 gint param;
115 gint val;
116 };
117
118 static struct srt_constant_params srt_params[] = {
119 {"SRTO_SNDSYN", SRTO_SNDSYN, 0}, /* 0: non-blocking */
120 {"SRTO_RCVSYN", SRTO_RCVSYN, 0}, /* 0: non-blocking */
121 {"SRTO_LINGER", SRTO_LINGER, 0},
122 {"SRTO_TSBPMODE", SRTO_TSBPDMODE, 1}, /* Timestamp-based Packet Delivery mode must be enabled */
123 {NULL, -1, -1},
124 };
125
126 static gint srt_init_refcount = 0;
127
128 static gboolean
gst_srt_object_set_common_params(SRTSOCKET sock,GstSRTObject * srtobject,GError ** error)129 gst_srt_object_set_common_params (SRTSOCKET sock, GstSRTObject * srtobject,
130 GError ** error)
131 {
132 struct srt_constant_params *params = srt_params;
133
134 for (; params->name != NULL; params++) {
135 if (srt_setsockopt (sock, 0, params->param, ¶ms->val, sizeof (gint))) {
136 g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
137 "failed to set %s (reason: %s)", params->name,
138 srt_getlasterror_str ());
139 return FALSE;
140 }
141 }
142
143 if (srtobject->passphrase != NULL && srtobject->passphrase[0] != '\0') {
144 gint pbkeylen;
145
146 if (srt_setsockopt (sock, 0, SRTO_PASSPHRASE, srtobject->passphrase,
147 strlen (srtobject->passphrase))) {
148 g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
149 "failed to set passphrase (reason: %s)", srt_getlasterror_str ());
150
151 return FALSE;
152 }
153
154 if (!gst_structure_get_int (srtobject->parameters, "pbkeylen", &pbkeylen)) {
155 pbkeylen = GST_SRT_DEFAULT_PBKEYLEN;
156 }
157
158 if (srt_setsockopt (sock, 0, SRTO_PBKEYLEN, &pbkeylen, sizeof (int))) {
159 g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
160 "failed to set pbkeylen (reason: %s)", srt_getlasterror_str ());
161 return FALSE;
162 }
163 }
164
165 {
166 int latency;
167
168 if (!gst_structure_get_int (srtobject->parameters, "latency", &latency))
169 latency = GST_SRT_DEFAULT_LATENCY;
170 if (srt_setsockopt (sock, 0, SRTO_LATENCY, &latency, sizeof (int))) {
171 g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
172 "failed to set latency (reason: %s)", srt_getlasterror_str ());
173 return FALSE;
174 }
175 }
176
177 return TRUE;
178 }
179
180 GstSRTObject *
gst_srt_object_new(GstElement * element)181 gst_srt_object_new (GstElement * element)
182 {
183 GstSRTObject *srtobject;
184
185 if (g_atomic_int_get (&srt_init_refcount) == 0) {
186 GST_DEBUG_OBJECT (element, "Starting up SRT");
187 if (srt_startup () != 0) {
188 g_warning ("Failed to initialize SRT (reason: %s)",
189 srt_getlasterror_str ());
190 }
191 }
192
193 g_atomic_int_inc (&srt_init_refcount);
194
195 srtobject = g_new0 (GstSRTObject, 1);
196 srtobject->element = element;
197 srtobject->parameters = gst_structure_new ("application/x-srt-params",
198 "poll-timeout", G_TYPE_INT, GST_SRT_DEFAULT_POLL_TIMEOUT,
199 "latency", G_TYPE_INT, GST_SRT_DEFAULT_LATENCY,
200 "mode", GST_TYPE_SRT_CONNECTION_MODE, GST_SRT_DEFAULT_MODE, NULL);
201
202 srtobject->sock = SRT_INVALID_SOCK;
203 srtobject->poll_id = srt_epoll_create ();
204 srtobject->listener_sock = SRT_INVALID_SOCK;
205 srtobject->listener_poll_id = SRT_ERROR;
206 srtobject->sent_headers = FALSE;
207
208 g_cond_init (&srtobject->sock_cond);
209 return srtobject;
210 }
211
212 void
gst_srt_object_destroy(GstSRTObject * srtobject)213 gst_srt_object_destroy (GstSRTObject * srtobject)
214 {
215 g_return_if_fail (srtobject != NULL);
216
217 if (srtobject->poll_id != SRT_ERROR) {
218 srt_epoll_release (srtobject->poll_id);
219 srtobject->poll_id = SRT_ERROR;
220 }
221
222 g_cond_clear (&srtobject->sock_cond);
223
224 GST_DEBUG_OBJECT (srtobject->element, "Destroying srtobject");
225 gst_structure_free (srtobject->parameters);
226
227 g_free (srtobject->passphrase);
228
229 if (g_atomic_int_dec_and_test (&srt_init_refcount)) {
230 srt_cleanup ();
231 GST_DEBUG_OBJECT (srtobject->element, "Cleaning up SRT");
232 }
233
234 g_clear_pointer (&srtobject->uri, gst_uri_unref);
235
236 g_free (srtobject);
237 }
238
239 gboolean
gst_srt_object_set_property_helper(GstSRTObject * srtobject,guint prop_id,const GValue * value,GParamSpec * pspec)240 gst_srt_object_set_property_helper (GstSRTObject * srtobject,
241 guint prop_id, const GValue * value, GParamSpec * pspec)
242 {
243 switch (prop_id) {
244 case PROP_URI:{
245 const gchar *uri = g_value_get_string (value);
246 gst_srt_object_set_uri (srtobject, uri, NULL);
247 break;
248 }
249 case PROP_MODE:
250 gst_structure_set_value (srtobject->parameters, "mode", value);
251 break;
252 case PROP_POLL_TIMEOUT:
253 gst_structure_set_value (srtobject->parameters, "poll-timeout", value);
254 break;
255 case PROP_LATENCY:
256 gst_structure_set_value (srtobject->parameters, "latency", value);
257 break;
258 case PROP_LOCALADDRESS:
259 gst_structure_set_value (srtobject->parameters, "localaddress", value);
260 break;
261 case PROP_LOCALPORT:
262 gst_structure_set_value (srtobject->parameters, "localport", value);
263 break;
264 case PROP_PASSPHRASE:
265 g_free (srtobject->passphrase);
266 srtobject->passphrase = g_value_dup_string (value);
267 break;
268 case PROP_PBKEYLEN:
269 gst_structure_set_value (srtobject->parameters, "pbkeylen", value);
270 break;
271 default:
272 return FALSE;
273 }
274 return TRUE;
275 }
276
277 gboolean
gst_srt_object_get_property_helper(GstSRTObject * srtobject,guint prop_id,GValue * value,GParamSpec * pspec)278 gst_srt_object_get_property_helper (GstSRTObject * srtobject,
279 guint prop_id, GValue * value, GParamSpec * pspec)
280 {
281 switch (prop_id) {
282 case PROP_URI:
283 g_value_take_string (value, gst_uri_to_string (srtobject->uri));
284 break;
285 case PROP_MODE:{
286 GstSRTConnectionMode v;
287 if (!gst_structure_get_enum (srtobject->parameters, "mode",
288 GST_TYPE_SRT_CONNECTION_MODE, (gint *) & v)) {
289 GST_WARNING_OBJECT (srtobject->element, "Failed to get 'mode'");
290 v = GST_SRT_CONNECTION_MODE_NONE;
291 }
292 g_value_set_enum (value, v);
293 break;
294 }
295 case PROP_LOCALADDRESS:
296 g_value_set_string (value,
297 gst_structure_get_string (srtobject->parameters, "localaddress"));
298 break;
299 case PROP_LOCALPORT:{
300 guint v;
301 if (!gst_structure_get_uint (srtobject->parameters, "localport", &v)) {
302 GST_WARNING_OBJECT (srtobject->element, "Failed to get 'localport'");
303 v = GST_SRT_DEFAULT_PORT;
304 }
305 g_value_set_uint (value, v);
306 break;
307 }
308 case PROP_PBKEYLEN:{
309 GstSRTKeyLength v;
310 if (!gst_structure_get_enum (srtobject->parameters, "pbkeylen",
311 GST_TYPE_SRT_KEY_LENGTH, (gint *) & v)) {
312 GST_WARNING_OBJECT (srtobject->element, "Failed to get 'pbkeylen'");
313 v = GST_SRT_KEY_LENGTH_NO_KEY;
314 }
315 g_value_set_enum (value, v);
316 break;
317 }
318 case PROP_POLL_TIMEOUT:{
319 gint v;
320 if (!gst_structure_get_int (srtobject->parameters, "poll-timeout", &v)) {
321 GST_WARNING_OBJECT (srtobject->element, "Failed to get 'poll-timeout'");
322 v = GST_SRT_DEFAULT_POLL_TIMEOUT;
323 }
324 g_value_set_int (value, v);
325 break;
326 }
327 case PROP_LATENCY:{
328 gint v;
329 if (!gst_structure_get_int (srtobject->parameters, "latency", &v)) {
330 GST_WARNING_OBJECT (srtobject->element, "Failed to get 'latency'");
331 v = GST_SRT_DEFAULT_LATENCY;
332 }
333 g_value_set_int (value, v);
334 break;
335 }
336 case PROP_STATS:
337 g_value_take_boxed (value, gst_srt_object_get_stats (srtobject));
338 break;
339 default:
340 return FALSE;
341 }
342
343 return TRUE;
344 }
345
346 void
gst_srt_object_install_properties_helper(GObjectClass * gobject_class)347 gst_srt_object_install_properties_helper (GObjectClass * gobject_class)
348 {
349 /**
350 * GstSRTSrc:uri:
351 *
352 * The URI used by SRT connection. User can specify SRT specific options by URI parameters.
353 * Refer to <a href="https://github.com/Haivision/srt/blob/master/docs/stransmit.md#medium-srt">Mediun: SRT</a>
354 */
355 g_object_class_install_property (gobject_class, PROP_URI,
356 g_param_spec_string ("uri", "URI",
357 "URI in the form of srt://address:port", GST_SRT_DEFAULT_URI,
358 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
359 G_PARAM_STATIC_STRINGS));
360
361 /**
362 * GstSRTSrc:mode:
363 *
364 * The SRT connection mode.
365 * This property can be set by URI parameters.
366 */
367 g_object_class_install_property (gobject_class, PROP_MODE,
368 g_param_spec_enum ("mode", "Connection mode",
369 "SRT connection mode", GST_TYPE_SRT_CONNECTION_MODE,
370 GST_SRT_CONNECTION_MODE_CALLER,
371 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
372 G_PARAM_STATIC_STRINGS));
373
374 /**
375 * GstSRTSrc:localaddress:
376 *
377 * The address to bind when #GstSRTSrc:mode is listener or rendezvous.
378 * This property can be set by URI parameters.
379 */
380 g_object_class_install_property (gobject_class, PROP_LOCALADDRESS,
381 g_param_spec_string ("localaddress", "Local address",
382 "Local address to bind", GST_SRT_DEFAULT_LOCALADDRESS,
383 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
384 G_PARAM_STATIC_STRINGS));
385
386 /**
387 * GstSRTSrc:localport:
388 *
389 * The local port to bind when #GstSRTSrc:mode is listener or rendezvous.
390 * This property can be set by URI parameters.
391 */
392 g_object_class_install_property (gobject_class, PROP_LOCALPORT,
393 g_param_spec_uint ("localport", "Local port",
394 "Local port to bind", 0,
395 65535, GST_SRT_DEFAULT_PORT,
396 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
397 G_PARAM_STATIC_STRINGS));
398
399 /**
400 * GstSRTSrc:passphrase:
401 *
402 * The password for the encrypted transmission.
403 * This property can be set by URI parameters.
404 */
405 g_object_class_install_property (gobject_class, PROP_PASSPHRASE,
406 g_param_spec_string ("passphrase", "Passphrase",
407 "Password for the encrypted transmission", "",
408 G_PARAM_WRITABLE | GST_PARAM_MUTABLE_READY | G_PARAM_STATIC_STRINGS));
409
410 /**
411 * GstSRTSrc:pbkeylen:
412 *
413 * The crypto key length.
414 * This property can be set by URI parameters.
415 */
416 g_object_class_install_property (gobject_class, PROP_PBKEYLEN,
417 g_param_spec_enum ("pbkeylen", "Crypto key length",
418 "Crypto key length in bytes", GST_TYPE_SRT_KEY_LENGTH,
419 GST_SRT_DEFAULT_PBKEYLEN,
420 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
421 G_PARAM_STATIC_STRINGS));
422
423 /**
424 * GstSRTSrc:poll-timeout:
425 *
426 * The polling timeout used when srt poll is started.
427 * Even if the default value indicates infinite waiting, it can be cancellable according to #GstState
428 * This property can be set by URI parameters.
429 */
430 g_object_class_install_property (gobject_class, PROP_POLL_TIMEOUT,
431 g_param_spec_int ("poll-timeout", "Poll timeout",
432 "Return poll wait after timeout miliseconds (-1 = infinite)", -1,
433 G_MAXINT32, GST_SRT_DEFAULT_POLL_TIMEOUT,
434 G_PARAM_READWRITE | GST_PARAM_MUTABLE_READY |
435 G_PARAM_STATIC_STRINGS));
436
437 /**
438 * GstSRTSrc:latency:
439 *
440 * The maximum accepted transmission latency.
441 */
442 g_object_class_install_property (gobject_class, PROP_LATENCY,
443 g_param_spec_int ("latency", "latency",
444 "Minimum latency (milliseconds)", 0,
445 G_MAXINT32, GST_SRT_DEFAULT_LATENCY,
446 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
447
448 /**
449 * GstSRTSrc:stats:
450 *
451 * The statistics from SRT.
452 */
453 g_object_class_install_property (gobject_class, PROP_STATS,
454 g_param_spec_boxed ("stats", "Statistics",
455 "SRT Statistics", GST_TYPE_STRUCTURE,
456 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
457
458 }
459
460 static void
gst_srt_object_set_enum_value(GstStructure * s,GType enum_type,gconstpointer key,gconstpointer value)461 gst_srt_object_set_enum_value (GstStructure * s, GType enum_type,
462 gconstpointer key, gconstpointer value)
463 {
464 GEnumClass *enum_class;
465 GEnumValue *enum_value;
466
467 enum_class = g_type_class_ref (enum_type);
468 enum_value = g_enum_get_value_by_nick (enum_class, value);
469
470 if (enum_value) {
471 GValue v = G_VALUE_INIT;
472 g_value_init (&v, enum_type);
473 g_value_set_enum (&v, enum_value->value);
474 gst_structure_set_value (s, key, &v);
475 }
476
477 g_type_class_unref (enum_class);
478 }
479
480 static void
gst_srt_object_set_string_value(GstStructure * s,const gchar * key,const gchar * value)481 gst_srt_object_set_string_value (GstStructure * s, const gchar * key,
482 const gchar * value)
483 {
484 GValue v = G_VALUE_INIT;
485 g_value_init (&v, G_TYPE_STRING);
486 g_value_set_static_string (&v, value);
487 gst_structure_set_value (s, key, &v);
488 g_value_unset (&v);
489 }
490
491 static void
gst_srt_object_set_uint_value(GstStructure * s,const gchar * key,const gchar * value)492 gst_srt_object_set_uint_value (GstStructure * s, const gchar * key,
493 const gchar * value)
494 {
495 GValue v = G_VALUE_INIT;
496 g_value_init (&v, G_TYPE_UINT);
497 g_value_set_uint (&v, (guint) strtoul (value, NULL, 10));
498 gst_structure_set_value (s, key, &v);
499 g_value_unset (&v);
500 }
501
502 static void
gst_srt_object_validate_parameters(GstStructure * s,GstUri * uri)503 gst_srt_object_validate_parameters (GstStructure * s, GstUri * uri)
504 {
505 GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
506
507 gst_structure_get_enum (s, "mode", GST_TYPE_SRT_CONNECTION_MODE,
508 (gint *) & connection_mode);
509
510 if (connection_mode == GST_SRT_CONNECTION_MODE_RENDEZVOUS ||
511 connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
512 guint local_port;
513 const gchar *local_address = gst_structure_get_string (s, "localaddress");
514
515 if (local_address == NULL) {
516 local_address =
517 gst_uri_get_host (uri) ==
518 NULL ? GST_SRT_DEFAULT_LOCALADDRESS : gst_uri_get_host (uri);
519 gst_srt_object_set_string_value (s, "localaddress", local_address);
520 }
521
522 if (!gst_structure_get_uint (s, "localport", &local_port)) {
523 local_port =
524 gst_uri_get_port (uri) ==
525 GST_URI_NO_PORT ? GST_SRT_DEFAULT_PORT : gst_uri_get_port (uri);
526 gst_structure_set (s, "localport", G_TYPE_UINT, local_port, NULL);
527 }
528 }
529 }
530
531 gboolean
gst_srt_object_set_uri(GstSRTObject * srtobject,const gchar * uri,GError ** err)532 gst_srt_object_set_uri (GstSRTObject * srtobject, const gchar * uri,
533 GError ** err)
534 {
535 GHashTable *query_table = NULL;
536 GHashTableIter iter;
537 gpointer key, value;
538 const char *addr_str;
539
540 if (srtobject->opened) {
541 g_warning
542 ("It's not supported to change the 'uri' property when SRT socket is opened.");
543 g_set_error (err, GST_URI_ERROR, GST_URI_ERROR_BAD_STATE,
544 "It's not supported to change the 'uri' property when SRT socket is opened");
545
546 return FALSE;
547 }
548
549 if (!g_str_has_prefix (uri, GST_SRT_DEFAULT_URI_SCHEME)) {
550 g_warning ("Given uri cannot be used for SRT connection.");
551 g_set_error (err, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
552 "Invalid SRT URI scheme");
553 return FALSE;
554 }
555
556 g_clear_pointer (&srtobject->uri, gst_uri_unref);
557 srtobject->uri = gst_uri_from_string (uri);
558
559 query_table = gst_uri_get_query_table (srtobject->uri);
560
561 GST_DEBUG_OBJECT (srtobject->element,
562 "set uri to (host: %s, port: %d) with %d query strings",
563 gst_uri_get_host (srtobject->uri), gst_uri_get_port (srtobject->uri),
564 query_table == NULL ? 0 : g_hash_table_size (query_table));
565
566 addr_str = gst_uri_get_host (srtobject->uri);
567 if (addr_str)
568 gst_srt_object_set_enum_value (srtobject->parameters,
569 GST_TYPE_SRT_CONNECTION_MODE, "mode", "caller");
570 else
571 gst_srt_object_set_enum_value (srtobject->parameters,
572 GST_TYPE_SRT_CONNECTION_MODE, "mode", "listener");
573
574 if (query_table) {
575 g_hash_table_iter_init (&iter, query_table);
576 while (g_hash_table_iter_next (&iter, &key, &value)) {
577 if (!g_strcmp0 ("mode", key)) {
578 gst_srt_object_set_enum_value (srtobject->parameters,
579 GST_TYPE_SRT_CONNECTION_MODE, key, value);
580 } else if (!g_strcmp0 ("localaddress", key)) {
581 gst_srt_object_set_string_value (srtobject->parameters, key, value);
582 } else if (!g_strcmp0 ("localport", key)) {
583 gst_srt_object_set_uint_value (srtobject->parameters, key, value);
584 } else if (!g_strcmp0 ("passphrase", key)) {
585 g_free (srtobject->passphrase);
586 srtobject->passphrase = g_strdup (value);
587 } else if (!g_strcmp0 ("pbkeylen", key)) {
588 gst_srt_object_set_enum_value (srtobject->parameters,
589 GST_TYPE_SRT_KEY_LENGTH, key, value);
590 }
591 }
592
593 g_hash_table_unref (query_table);
594 }
595
596 gst_srt_object_validate_parameters (srtobject->parameters, srtobject->uri);
597
598 return TRUE;
599 }
600
601 static gpointer
thread_func(gpointer data)602 thread_func (gpointer data)
603 {
604 GstSRTObject *srtobject = data;
605 SRTSOCKET caller_sock;
606 union
607 {
608 struct sockaddr_storage ss;
609 struct sockaddr sa;
610 } caller_sa;
611 int caller_sa_len;
612
613 gint poll_timeout;
614
615 SRTSOCKET rsock;
616 gint rsocklen = 1;
617
618 for (;;) {
619 if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
620 &poll_timeout)) {
621 poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
622 }
623
624 GST_DEBUG_OBJECT (srtobject->element, "Waiting a request from caller");
625
626 if (srt_epoll_wait (srtobject->listener_poll_id, &rsock,
627 &rsocklen, 0, 0, poll_timeout, NULL, 0, NULL, 0) < 0) {
628 gint srt_errno = srt_getlasterror (NULL);
629
630 if (srtobject->listener_poll_id == SRT_ERROR)
631 return NULL;
632 if (srt_errno == SRT_ETIMEOUT) {
633 continue;
634 } else {
635 GST_ELEMENT_ERROR (srtobject->element, RESOURCE, FAILED,
636 ("abort polling: %s", srt_getlasterror_str ()), (NULL));
637 return NULL;
638 }
639 }
640
641 caller_sock =
642 srt_accept (srtobject->listener_sock, &caller_sa.sa, &caller_sa_len);
643
644 if (caller_sock != SRT_INVALID_SOCK) {
645 SRTCaller *caller;
646 gint flag = SRT_EPOLL_ERR;
647
648 caller = srt_caller_new ();
649 caller->sockaddr =
650 g_socket_address_new_from_native (&caller_sa.sa, caller_sa_len);
651 caller->poll_id = srt_epoll_create ();
652 caller->sock = caller_sock;
653
654 if (gst_uri_handler_get_uri_type (GST_URI_HANDLER
655 (srtobject->element)) == GST_URI_SRC) {
656 flag |= SRT_EPOLL_IN;
657 } else {
658 flag |= SRT_EPOLL_OUT;
659 }
660
661 if (srt_epoll_add_usock (caller->poll_id, caller_sock, &flag)) {
662
663 GST_ELEMENT_ERROR (srtobject->element, RESOURCE, SETTINGS,
664 ("%s", srt_getlasterror_str ()), (NULL));
665
666 srt_caller_free (caller);
667
668 /* try-again */
669 continue;
670 }
671
672 GST_OBJECT_LOCK (srtobject->element);
673 srtobject->callers = g_list_append (srtobject->callers, caller);
674 g_cond_signal (&srtobject->sock_cond);
675 GST_OBJECT_UNLOCK (srtobject->element);
676
677 /* notifying caller-added */
678 if (srtobject->caller_added_closure != NULL) {
679 GValue values[2] = { G_VALUE_INIT, G_VALUE_INIT };
680
681 g_value_init (&values[0], G_TYPE_INT);
682 g_value_set_int (&values[0], caller->sock);
683
684 g_value_init (&values[1], G_TYPE_SOCKET_ADDRESS);
685 g_value_set_object (&values[1], caller->sockaddr);
686
687 g_closure_invoke (srtobject->caller_added_closure, NULL, 2, values,
688 NULL);
689
690 g_value_unset (&values[1]);
691 }
692
693 GST_DEBUG_OBJECT (srtobject->element, "Accept to connect");
694
695 if (gst_uri_handler_get_uri_type (GST_URI_HANDLER (srtobject->element)) ==
696 GST_URI_SRC)
697 return NULL;
698 }
699 }
700 }
701
702 static gboolean
gst_srt_object_wait_connect(GstSRTObject * srtobject,GCancellable * cancellable,gpointer sa,size_t sa_len,GError ** error)703 gst_srt_object_wait_connect (GstSRTObject * srtobject,
704 GCancellable * cancellable, gpointer sa, size_t sa_len, GError ** error)
705 {
706 SRTSOCKET sock = SRT_INVALID_SOCK;
707 const gchar *local_address = NULL;
708 guint local_port = 0;
709 gint sock_flags = SRT_EPOLL_ERR | SRT_EPOLL_IN;
710
711 gpointer bind_sa;
712 gsize bind_sa_len;
713 GSocketAddress *bind_addr;
714
715 gst_structure_get_uint (srtobject->parameters, "localport", &local_port);
716
717 local_address =
718 gst_structure_get_string (srtobject->parameters, "localaddress");
719 if (local_address == NULL)
720 local_address = GST_SRT_DEFAULT_LOCALADDRESS;
721
722 bind_addr = g_inet_socket_address_new_from_string (local_address, local_port);
723 bind_sa_len = g_socket_address_get_native_size (bind_addr);
724 bind_sa = g_alloca (bind_sa_len);
725
726 if (!g_socket_address_to_native (bind_addr, bind_sa, bind_sa_len, error)) {
727 goto failed;
728 }
729
730 g_clear_object (&bind_addr);
731
732 sock = srt_socket (AF_INET, SOCK_DGRAM, 0);
733 if (sock == SRT_INVALID_SOCK) {
734 g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_INIT, "%s",
735 srt_getlasterror_str ());
736 goto failed;
737 }
738
739 if (!gst_srt_object_set_common_params (sock, srtobject, error)) {
740 goto failed;
741 }
742
743 GST_DEBUG_OBJECT (srtobject->element, "Binding to %s (port: %d)",
744 local_address, local_port);
745
746 if (srt_bind (sock, bind_sa, bind_sa_len) == SRT_ERROR) {
747 g_set_error (error, GST_RESOURCE_ERROR,
748 GST_RESOURCE_ERROR_OPEN_READ_WRITE, "Cannot bind to %s:%d - %s",
749 local_address, local_port, srt_getlasterror_str ());
750 goto failed;
751 }
752
753 if (srt_epoll_add_usock (srtobject->listener_poll_id, sock, &sock_flags)) {
754 g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "%s",
755 srt_getlasterror_str ());
756 goto failed;
757 }
758
759 GST_DEBUG_OBJECT (srtobject->element, "Starting to listen on bind socket");
760 if (srt_listen (sock, 1) == SRT_ERROR) {
761 g_set_error (error, GST_RESOURCE_ERROR,
762 GST_RESOURCE_ERROR_OPEN_READ_WRITE, "Cannot listen on bind socket: %s",
763 srt_getlasterror_str ());
764
765 goto failed;
766 }
767
768 srtobject->listener_sock = sock;
769
770 srtobject->thread =
771 g_thread_try_new ("GstSRTObjectListener", thread_func, srtobject, error);
772
773 if (*error != NULL) {
774 goto failed;
775 }
776
777 return TRUE;
778
779 failed:
780
781 if (srtobject->listener_poll_id != SRT_ERROR) {
782 srt_epoll_release (srtobject->listener_poll_id);
783 }
784
785 if (sock != SRT_INVALID_SOCK) {
786 srt_close (sock);
787 }
788
789 g_clear_object (&bind_addr);
790
791 srtobject->listener_poll_id = SRT_ERROR;
792 srtobject->listener_sock = SRT_INVALID_SOCK;
793
794 return FALSE;
795 }
796
797 static gboolean
gst_srt_object_connect(GstSRTObject * srtobject,GstSRTConnectionMode connection_mode,gpointer sa,size_t sa_len,GError ** error)798 gst_srt_object_connect (GstSRTObject * srtobject,
799 GstSRTConnectionMode connection_mode, gpointer sa, size_t sa_len,
800 GError ** error)
801 {
802 SRTSOCKET sock;
803 gint option_val = -1;
804 gint sock_flags = SRT_EPOLL_ERR;
805 guint local_port = 0;
806 const gchar *local_address = NULL;
807
808 sock = srt_socket (AF_INET, SOCK_DGRAM, 0);
809 if (sock == SRT_INVALID_SOCK) {
810 g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_INIT, "%s",
811 srt_getlasterror_str ());
812 goto failed;
813 }
814
815 if (!gst_srt_object_set_common_params (sock, srtobject, error)) {
816 goto failed;
817 }
818
819 switch (gst_uri_handler_get_uri_type (GST_URI_HANDLER (srtobject->element))) {
820 case GST_URI_SRC:
821 option_val = 0;
822 sock_flags |= SRT_EPOLL_IN;
823 break;
824 case GST_URI_SINK:
825 option_val = 1;
826 sock_flags |= SRT_EPOLL_OUT;
827 break;
828 default:
829 g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS,
830 "Cannot determine stream direction");
831 goto failed;
832 }
833
834 if (srt_setsockopt (sock, 0, SRTO_SENDER, &option_val, sizeof (gint))) {
835 g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "%s",
836 srt_getlasterror_str ());
837 goto failed;
838 }
839
840 option_val = (connection_mode == GST_SRT_CONNECTION_MODE_RENDEZVOUS);
841 if (srt_setsockopt (sock, 0, SRTO_RENDEZVOUS, &option_val, sizeof (gint))) {
842 g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "%s",
843 srt_getlasterror_str ());
844 goto failed;
845 }
846
847 gst_structure_get_uint (srtobject->parameters, "localport", &local_port);
848 local_address =
849 gst_structure_get_string (srtobject->parameters, "localaddress");
850 /* According to SRT norm, bind local address and port if specified */
851 if (local_address != NULL && local_port != 0) {
852 gpointer bind_sa;
853 gsize bind_sa_len;
854
855 GSocketAddress *bind_addr =
856 g_inet_socket_address_new_from_string (local_address,
857 local_port);
858
859 bind_sa_len = g_socket_address_get_native_size (bind_addr);
860 bind_sa = g_alloca (bind_sa_len);
861
862 if (!g_socket_address_to_native (bind_addr, bind_sa, bind_sa_len, error)) {
863 g_clear_object (&bind_addr);
864 goto failed;
865 }
866
867 g_clear_object (&bind_addr);
868
869 GST_DEBUG_OBJECT (srtobject->element, "Binding to %s (port: %d)",
870 local_address, local_port);
871
872 if (srt_bind (sock, bind_sa, bind_sa_len) == SRT_ERROR) {
873 g_set_error (error, GST_RESOURCE_ERROR,
874 GST_RESOURCE_ERROR_OPEN_READ_WRITE, "Cannot bind to %s:%d - %s",
875 local_address, local_port, srt_getlasterror_str ());
876 goto failed;
877 }
878 }
879
880 if (srt_epoll_add_usock (srtobject->poll_id, sock, &sock_flags)) {
881 g_set_error (error, GST_LIBRARY_ERROR, GST_LIBRARY_ERROR_SETTINGS, "%s",
882 srt_getlasterror_str ());
883 goto failed;
884 }
885
886 if (srt_connect (sock, sa, sa_len) == SRT_ERROR) {
887 g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_OPEN_READ, "%s",
888 srt_getlasterror_str ());
889 goto failed;
890 }
891
892 srtobject->sock = sock;
893
894 return TRUE;
895
896 failed:
897
898 if (srtobject->poll_id != SRT_ERROR) {
899 srt_epoll_release (srtobject->poll_id);
900 }
901
902 if (sock != SRT_INVALID_SOCK) {
903 srt_close (sock);
904 }
905
906 srtobject->poll_id = SRT_ERROR;
907 srtobject->sock = SRT_INVALID_SOCK;
908
909 return FALSE;
910 }
911
912 static gboolean
gst_srt_object_open_connection(GstSRTObject * srtobject,GCancellable * cancellable,GstSRTConnectionMode connection_mode,gpointer sa,size_t sa_len,GError ** error)913 gst_srt_object_open_connection (GstSRTObject * srtobject,
914 GCancellable * cancellable, GstSRTConnectionMode connection_mode,
915 gpointer sa, size_t sa_len, GError ** error)
916 {
917 gboolean ret = FALSE;
918
919 if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
920 ret =
921 gst_srt_object_wait_connect (srtobject, cancellable, sa, sa_len, error);
922 } else {
923 ret =
924 gst_srt_object_connect (srtobject, connection_mode, sa, sa_len, error);
925 }
926
927 return ret;
928 }
929
930 gboolean
gst_srt_object_open(GstSRTObject * srtobject,GCancellable * cancellable,GError ** error)931 gst_srt_object_open (GstSRTObject * srtobject, GCancellable * cancellable,
932 GError ** error)
933 {
934 return gst_srt_object_open_full (srtobject, NULL, NULL, cancellable, error);
935 }
936
937 gboolean
gst_srt_object_open_full(GstSRTObject * srtobject,GstSRTObjectCallerAdded caller_added_func,GstSRTObjectCallerRemoved caller_removed_func,GCancellable * cancellable,GError ** error)938 gst_srt_object_open_full (GstSRTObject * srtobject,
939 GstSRTObjectCallerAdded caller_added_func,
940 GstSRTObjectCallerRemoved caller_removed_func,
941 GCancellable * cancellable, GError ** error)
942 {
943 GSocketAddress *socket_address = NULL;
944 GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
945
946 gpointer sa;
947 size_t sa_len;
948 const gchar *addr_str;
949
950 srtobject->opened = FALSE;
951
952 if (caller_added_func != NULL) {
953 srtobject->caller_added_closure =
954 g_cclosure_new (G_CALLBACK (caller_added_func), srtobject, NULL);
955 g_closure_set_marshal (srtobject->caller_added_closure,
956 g_cclosure_marshal_generic);
957 }
958
959 if (caller_removed_func != NULL) {
960 srtobject->caller_removed_closure =
961 g_cclosure_new (G_CALLBACK (caller_removed_func), srtobject, NULL);
962 g_closure_set_marshal (srtobject->caller_removed_closure,
963 g_cclosure_marshal_generic);
964 }
965
966 addr_str = gst_uri_get_host (srtobject->uri);
967
968 if (addr_str == NULL) {
969 addr_str = GST_SRT_DEFAULT_LOCALADDRESS;
970 GST_DEBUG_OBJECT (srtobject->element,
971 "Given uri doesn't have hostname or address. Use any (%s) and"
972 " setting listener mode", addr_str);
973 }
974
975 socket_address =
976 g_inet_socket_address_new_from_string (addr_str,
977 gst_uri_get_port (srtobject->uri));
978
979 if (socket_address == NULL) {
980 g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_OPEN_READ,
981 "Invalid host");
982 goto out;
983 }
984
985 /* FIXME: Unfortunately, SRT doesn't support IPv6 currently. */
986 if (g_socket_address_get_family (socket_address) != G_SOCKET_FAMILY_IPV4) {
987 g_set_error (error, GST_RESOURCE_ERROR, GST_RESOURCE_ERROR_OPEN_READ,
988 "SRT supports IPv4 only");
989 goto out;
990 }
991
992 sa_len = g_socket_address_get_native_size (socket_address);
993 sa = g_alloca (sa_len);
994
995 if (!g_socket_address_to_native (socket_address, sa, sa_len, error)) {
996 goto out;
997 }
998
999 GST_DEBUG_OBJECT (srtobject->element,
1000 "Opening SRT socket with parameters: %" GST_PTR_FORMAT,
1001 srtobject->parameters);
1002
1003 if (!gst_structure_get_enum (srtobject->parameters,
1004 "mode", GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode)) {
1005 GST_WARNING_OBJECT (srtobject->element,
1006 "Cannot get connection mode information." " Use default mode");
1007 connection_mode = GST_TYPE_SRT_CONNECTION_MODE;
1008 }
1009
1010 srtobject->listener_poll_id = srt_epoll_create ();
1011
1012 srtobject->opened =
1013 gst_srt_object_open_connection
1014 (srtobject, cancellable, connection_mode, sa, sa_len, error);
1015
1016 out:
1017 g_clear_object (&socket_address);
1018
1019 return srtobject->opened;
1020 }
1021
1022 void
gst_srt_object_close(GstSRTObject * srtobject)1023 gst_srt_object_close (GstSRTObject * srtobject)
1024 {
1025 GST_OBJECT_LOCK (srtobject->element);
1026 if (srtobject->poll_id != SRT_ERROR) {
1027 srt_epoll_remove_usock (srtobject->poll_id, srtobject->sock);
1028 }
1029
1030 if (srtobject->sock != SRT_INVALID_SOCK) {
1031
1032 GST_DEBUG_OBJECT (srtobject->element, "Closing SRT socket (0x%x)",
1033 srtobject->sock);
1034
1035 srt_close (srtobject->sock);
1036 srtobject->sock = SRT_INVALID_SOCK;
1037 }
1038
1039 if (srtobject->listener_poll_id != SRT_ERROR) {
1040 srt_epoll_remove_usock (srtobject->listener_poll_id,
1041 srtobject->listener_sock);
1042 srtobject->listener_poll_id = SRT_ERROR;
1043 }
1044 if (srtobject->thread) {
1045 GThread *thread = g_steal_pointer (&srtobject->thread);
1046 GST_OBJECT_UNLOCK (srtobject->element);
1047 g_thread_join (thread);
1048 GST_OBJECT_LOCK (srtobject->element);
1049 }
1050
1051 if (srtobject->listener_sock != SRT_INVALID_SOCK) {
1052 GST_DEBUG_OBJECT (srtobject->element, "Closing SRT listener socket (0x%x)",
1053 srtobject->listener_sock);
1054
1055 srt_close (srtobject->listener_sock);
1056 srtobject->listener_sock = SRT_INVALID_SOCK;
1057 }
1058
1059 if (srtobject->callers) {
1060 GList *callers = g_steal_pointer (&srtobject->callers);
1061 GST_OBJECT_UNLOCK (srtobject->element);
1062 g_list_foreach (callers, (GFunc) srt_caller_invoke_removed_closure,
1063 srtobject);
1064 GST_OBJECT_LOCK (srtobject->element);
1065 g_list_free_full (callers, (GDestroyNotify) srt_caller_free);
1066 }
1067
1068 g_clear_pointer (&srtobject->caller_added_closure, g_closure_unref);
1069 g_clear_pointer (&srtobject->caller_removed_closure, g_closure_unref);
1070
1071 srtobject->opened = FALSE;
1072 GST_OBJECT_UNLOCK (srtobject->element);
1073 }
1074
1075 static gboolean
gst_srt_object_wait_caller(GstSRTObject * srtobject,GCancellable * cancellable,GError ** errorj)1076 gst_srt_object_wait_caller (GstSRTObject * srtobject,
1077 GCancellable * cancellable, GError ** errorj)
1078 {
1079 gboolean ret = FALSE;
1080
1081 GST_DEBUG_OBJECT (srtobject->element, "Waiting connection from caller");
1082
1083 GST_OBJECT_LOCK (srtobject->element);
1084 while (!g_cancellable_is_cancelled (cancellable)) {
1085 ret = (srtobject->callers != NULL);
1086 if (ret)
1087 break;
1088 g_cond_wait (&srtobject->sock_cond,
1089 GST_OBJECT_GET_LOCK (srtobject->element));
1090 }
1091 GST_OBJECT_UNLOCK (srtobject->element);
1092
1093 GST_DEBUG_OBJECT (srtobject->element, "got %s connection", ret ? "a" : "no");
1094
1095 return ret;
1096 }
1097
1098 gssize
gst_srt_object_read(GstSRTObject * srtobject,guint8 * data,gsize size,GCancellable * cancellable,GError ** error)1099 gst_srt_object_read (GstSRTObject * srtobject,
1100 guint8 * data, gsize size, GCancellable * cancellable, GError ** error)
1101 {
1102 gssize len = 0;
1103 gint poll_timeout;
1104 GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
1105 gint poll_id = SRT_ERROR;
1106
1107 /* Only source element can read data */
1108 g_return_val_if_fail (gst_uri_handler_get_uri_type (GST_URI_HANDLER
1109 (srtobject->element)) == GST_URI_SRC, -1);
1110
1111 gst_structure_get_enum (srtobject->parameters, "mode",
1112 GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode);
1113
1114 if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
1115 SRTCaller *caller;
1116
1117 if (!gst_srt_object_wait_caller (srtobject, cancellable, error))
1118 return -1;
1119
1120 GST_OBJECT_LOCK (srtobject->element);
1121 caller = srtobject->callers->data;
1122 if (srtobject->callers)
1123 poll_id = caller->poll_id;
1124 GST_OBJECT_UNLOCK (srtobject->element);
1125 if (poll_id == SRT_ERROR)
1126 return 0;
1127 } else {
1128 poll_id = srtobject->poll_id;
1129 }
1130
1131 if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
1132 &poll_timeout)) {
1133 poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
1134 }
1135
1136 while (!g_cancellable_is_cancelled (cancellable)) {
1137
1138 SRTSOCKET rsock;
1139 gint rsocklen = 1;
1140 int pollret;
1141
1142 pollret = srt_epoll_wait (poll_id, &rsock,
1143 &rsocklen, 0, 0, poll_timeout, NULL, 0, NULL, 0);
1144 if (pollret < 0) {
1145 gint srt_errno = srt_getlasterror (NULL);
1146
1147 if (srt_errno != SRT_ETIMEOUT) {
1148 return 0;
1149 }
1150 continue;
1151 }
1152
1153 if (rsocklen < 0) {
1154 GST_WARNING_OBJECT (srtobject->element,
1155 "abnormal SRT socket is detected");
1156 srt_close (rsock);
1157 }
1158
1159 switch (srt_getsockstate (rsock)) {
1160 case SRTS_BROKEN:
1161 case SRTS_NONEXIST:
1162 case SRTS_CLOSED:
1163 if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
1164 /* Caller has been disappeared. */
1165 return 0;
1166 } else {
1167 GST_WARNING_OBJECT (srtobject->element,
1168 "Invalid SRT socket. Trying to reconnect");
1169 gst_srt_object_close (srtobject);
1170 if (!gst_srt_object_open (srtobject, cancellable, error)) {
1171 return -1;
1172 }
1173 continue;
1174 }
1175 case SRTS_CONNECTED:
1176 /* good to go */
1177 break;
1178 default:
1179 /* not-ready */
1180 continue;
1181 }
1182
1183
1184 len = srt_recvmsg (rsock, (char *) (data), size);
1185 break;
1186 }
1187
1188 return len;
1189 }
1190
1191 void
gst_srt_object_wakeup(GstSRTObject * srtobject,GCancellable * cancellable)1192 gst_srt_object_wakeup (GstSRTObject * srtobject, GCancellable * cancellable)
1193 {
1194 GST_DEBUG_OBJECT (srtobject->element, "waking up SRT");
1195
1196 /* Removing all socket descriptors from the monitoring list
1197 * wakes up SRT's threads. We only have one to remove. */
1198 srt_epoll_remove_usock (srtobject->poll_id, srtobject->sock);
1199
1200 /* connection is only waited for in listener mode,
1201 * but there is no harm in raising signal in any case */
1202 GST_OBJECT_LOCK (srtobject->element);
1203 /* however, a race might be harmful ...
1204 * the cancellation is used as 'flushing' flag here,
1205 * so make sure it is so detected by the intended part at proper time */
1206 g_cancellable_cancel (cancellable);
1207 g_cond_signal (&srtobject->sock_cond);
1208 GST_OBJECT_UNLOCK (srtobject->element);
1209 }
1210
1211 static gboolean
gst_srt_object_send_headers(GstSRTObject * srtobject,SRTSOCKET sock,gint poll_id,gint poll_timeout,GstBufferList * headers,GCancellable * cancellable)1212 gst_srt_object_send_headers (GstSRTObject * srtobject, SRTSOCKET sock,
1213 gint poll_id, gint poll_timeout, GstBufferList * headers,
1214 GCancellable * cancellable)
1215 {
1216 guint size, i;
1217
1218 if (!headers)
1219 return TRUE;
1220
1221 size = gst_buffer_list_length (headers);
1222
1223 GST_DEBUG_OBJECT (srtobject->element, "Sending %u stream headers", size);
1224
1225 for (i = 0; i < size; i++) {
1226 SRTSOCKET wsock = sock;
1227 gint wsocklen = 1;
1228
1229 GstBuffer *buffer = gst_buffer_list_get (headers, i);
1230 GstMapInfo mapinfo;
1231
1232 if (g_cancellable_is_cancelled (cancellable)) {
1233 return FALSE;
1234 }
1235
1236 if (poll_id > 0 && srt_epoll_wait (poll_id, 0, 0, &wsock,
1237 &wsocklen, poll_timeout, NULL, 0, NULL, 0) < 0) {
1238 continue;
1239 }
1240
1241 GST_TRACE_OBJECT (srtobject->element, "sending header %u %" GST_PTR_FORMAT,
1242 i, buffer);
1243
1244 if (!gst_buffer_map (buffer, &mapinfo, GST_MAP_READ)) {
1245 GST_ELEMENT_ERROR (srtobject->element, RESOURCE, READ,
1246 ("Could not map the input stream"), (NULL));
1247 return FALSE;
1248 }
1249
1250 if (srt_sendmsg2 (wsock, (char *) mapinfo.data, mapinfo.size,
1251 0) == SRT_ERROR) {
1252 GST_ELEMENT_ERROR (srtobject->element, RESOURCE, WRITE, NULL,
1253 ("%s", srt_getlasterror_str ()));
1254 gst_buffer_unmap (buffer, &mapinfo);
1255 return FALSE;
1256 }
1257
1258 gst_buffer_unmap (buffer, &mapinfo);
1259 }
1260
1261 return TRUE;
1262 }
1263
1264 static gssize
gst_srt_object_write_to_callers(GstSRTObject * srtobject,GstBufferList * headers,const GstMapInfo * mapinfo,GCancellable * cancellable,GError ** error)1265 gst_srt_object_write_to_callers (GstSRTObject * srtobject,
1266 GstBufferList * headers,
1267 const GstMapInfo * mapinfo, GCancellable * cancellable, GError ** error)
1268 {
1269 GList *callers;
1270
1271 GST_OBJECT_LOCK (srtobject->element);
1272 callers = srtobject->callers;
1273 while (callers != NULL) {
1274 gssize len = 0;
1275 const guint8 *msg = mapinfo->data;
1276 gint sent;
1277
1278 SRTCaller *caller = callers->data;
1279 callers = callers->next;
1280
1281 if (g_cancellable_is_cancelled (cancellable)) {
1282 GST_OBJECT_UNLOCK (srtobject->element);
1283 return -1;
1284 }
1285
1286 if (!caller->sent_headers) {
1287 if (!gst_srt_object_send_headers (srtobject, caller->sock, -1,
1288 -1, headers, cancellable)) {
1289 goto err;
1290 }
1291 caller->sent_headers = TRUE;
1292 }
1293
1294 while (len < mapinfo->size) {
1295 gint rest = mapinfo->size - len;
1296 sent = srt_sendmsg2 (caller->sock, (char *) (msg + len), rest, 0);
1297 if (sent < 0) {
1298 goto err;
1299 }
1300 len += sent;
1301 }
1302
1303 continue;
1304
1305 err:
1306 srtobject->callers = g_list_remove (srtobject->callers, caller);
1307 srt_caller_invoke_removed_closure (caller, srtobject);
1308 srt_caller_free (caller);
1309 }
1310
1311 GST_OBJECT_UNLOCK (srtobject->element);
1312
1313 return mapinfo->size;
1314 }
1315
1316 static gssize
gst_srt_object_write_one(GstSRTObject * srtobject,GstBufferList * headers,const GstMapInfo * mapinfo,GCancellable * cancellable,GError ** error)1317 gst_srt_object_write_one (GstSRTObject * srtobject,
1318 GstBufferList * headers,
1319 const GstMapInfo * mapinfo, GCancellable * cancellable, GError ** error)
1320 {
1321 gssize len = 0;
1322 gint poll_timeout;
1323 const guint8 *msg = mapinfo->data;
1324
1325 if (!gst_structure_get_int (srtobject->parameters, "poll-timeout",
1326 &poll_timeout)) {
1327 poll_timeout = GST_SRT_DEFAULT_POLL_TIMEOUT;
1328 }
1329
1330 if (!srtobject->sent_headers) {
1331 if (!gst_srt_object_send_headers (srtobject, srtobject->sock,
1332 srtobject->poll_id, poll_timeout, headers, cancellable)) {
1333 return -1;
1334 }
1335 srtobject->sent_headers = TRUE;
1336 }
1337
1338 while (len < mapinfo->size) {
1339 SRTSOCKET wsock;
1340 gint wsocklen = 1;
1341
1342 gint sent;
1343 gint rest = mapinfo->size - len;
1344
1345 if (g_cancellable_is_cancelled (cancellable)) {
1346 break;
1347 }
1348
1349 if (srt_epoll_wait (srtobject->poll_id, 0, 0, &wsock,
1350 &wsocklen, poll_timeout, NULL, 0, NULL, 0) < 0) {
1351 continue;
1352 }
1353
1354 switch (srt_getsockstate (wsock)) {
1355 case SRTS_BROKEN:
1356 case SRTS_NONEXIST:
1357 case SRTS_CLOSED:
1358 GST_WARNING_OBJECT (srtobject->element,
1359 "Invalid SRT socket. Trying to reconnect");
1360 gst_srt_object_close (srtobject);
1361 if (!gst_srt_object_open (srtobject, cancellable, error)) {
1362 return -1;
1363 }
1364 continue;
1365 case SRTS_CONNECTED:
1366 /* good to go */
1367 GST_LOG_OBJECT (srtobject->element, "good to go");
1368 break;
1369 default:
1370 GST_WARNING_OBJECT (srtobject->element, "not ready");
1371 /* not-ready */
1372 continue;
1373 }
1374
1375 sent = srt_sendmsg2 (wsock, (char *) (msg + len), rest, 0);
1376 if (sent < 0) {
1377 GST_ELEMENT_ERROR (srtobject->element, RESOURCE, WRITE, NULL,
1378 ("%s", srt_getlasterror_str ()));
1379 break;
1380 }
1381 len += sent;
1382 }
1383
1384 return len;
1385 }
1386
1387 gssize
gst_srt_object_write(GstSRTObject * srtobject,GstBufferList * headers,const GstMapInfo * mapinfo,GCancellable * cancellable,GError ** error)1388 gst_srt_object_write (GstSRTObject * srtobject,
1389 GstBufferList * headers,
1390 const GstMapInfo * mapinfo, GCancellable * cancellable, GError ** error)
1391 {
1392 gssize len = 0;
1393 GstSRTConnectionMode connection_mode = GST_SRT_CONNECTION_MODE_NONE;
1394
1395 /* Only sink element can write data */
1396 g_return_val_if_fail (gst_uri_handler_get_uri_type (GST_URI_HANDLER
1397 (srtobject->element)) == GST_URI_SINK, -1);
1398
1399 gst_structure_get_enum (srtobject->parameters, "mode",
1400 GST_TYPE_SRT_CONNECTION_MODE, (gint *) & connection_mode);
1401
1402 if (connection_mode == GST_SRT_CONNECTION_MODE_LISTENER) {
1403 if (!gst_srt_object_wait_caller (srtobject, cancellable, error))
1404 return -1;
1405
1406 len =
1407 gst_srt_object_write_to_callers (srtobject, headers, mapinfo,
1408 cancellable, error);
1409 } else {
1410 len =
1411 gst_srt_object_write_one (srtobject, headers, mapinfo, cancellable,
1412 error);
1413 }
1414
1415 return len;
1416 }
1417
1418 static GstStructure *
get_stats_for_srtsock(SRTSOCKET srtsock,gboolean is_sender)1419 get_stats_for_srtsock (SRTSOCKET srtsock, gboolean is_sender)
1420 {
1421 GstStructure *s = gst_structure_new_empty ("application/x-srt-statistics");
1422 int ret;
1423 SRT_TRACEBSTATS stats;
1424
1425 ret = srt_bstats (srtsock, &stats, 0);
1426
1427 if (ret >= 0) {
1428 if (is_sender)
1429 gst_structure_set (s,
1430 /* number of sent data packets, including retransmissions */
1431 "packets-sent", G_TYPE_INT64, stats.pktSent,
1432 /* number of lost packets (sender side) */
1433 "packets-sent-lost", G_TYPE_INT, stats.pktSndLoss,
1434 /* number of retransmitted packets */
1435 "packets-retransmitted", G_TYPE_INT, stats.pktRetrans,
1436 /* number of received ACK packets */
1437 "packet-ack-received", G_TYPE_INT, stats.pktRecvACK,
1438 /* number of received NAK packets */
1439 "packet-nack-received", G_TYPE_INT, stats.pktRecvNAK,
1440 /* time duration when UDT is sending data (idle time exclusive) */
1441 "send-duration-us", G_TYPE_INT64, stats.usSndDuration,
1442 /* number of sent data bytes, including retransmissions */
1443 "bytes-sent", G_TYPE_UINT64, stats.byteSent,
1444 /* number of retransmitted bytes */
1445 "bytes-retransmitted", G_TYPE_UINT64, stats.byteRetrans,
1446 /* number of too-late-to-send dropped bytes */
1447 "bytes-sent-dropped", G_TYPE_UINT64, stats.byteSndDrop,
1448 /* number of too-late-to-send dropped packets */
1449 "packets-sent-dropped", G_TYPE_INT, stats.pktSndDrop,
1450 /* sending rate in Mb/s */
1451 "send-rate-mbps", G_TYPE_DOUBLE, stats.mbpsSendRate,
1452 /* busy sending time (i.e., idle time exclusive) */
1453 "send-duration-us", G_TYPE_UINT64, stats.usSndDuration,
1454 "negotiated-latency-ms", G_TYPE_INT, stats.msSndTsbPdDelay, NULL);
1455 else
1456 gst_structure_set (s,
1457 "packets-received", G_TYPE_INT64, stats.pktRecvTotal,
1458 "packets-received-lost", G_TYPE_INT, stats.pktRcvLossTotal,
1459 /* number of sent ACK packets */
1460 "packet-ack-sent", G_TYPE_INT, stats.pktSentACK,
1461 /* number of sent NAK packets */
1462 "packet-nack-sent", G_TYPE_INT, stats.pktSentNAK,
1463 "bytes-received", G_TYPE_UINT64, stats.byteRecvTotal,
1464 "bytes-received-lost", G_TYPE_INT, stats.byteRcvLossTotal,
1465 "receive-rate-mbps", G_TYPE_DOUBLE, stats.mbpsRecvRate,
1466 "negotiated-latency-ms", G_TYPE_INT, stats.msRcvTsbPdDelay, NULL);
1467
1468 gst_structure_set (s,
1469 /* estimated bandwidth, in Mb/s */
1470 "bandwidth-mbps", G_TYPE_DOUBLE, stats.mbpsBandwidth,
1471 "rtt-ms", G_TYPE_DOUBLE, stats.msRTT, NULL);
1472
1473 }
1474
1475 return s;
1476 }
1477
1478 GstStructure *
gst_srt_object_get_stats(GstSRTObject * srtobject)1479 gst_srt_object_get_stats (GstSRTObject * srtobject)
1480 {
1481 GstStructure *s = NULL;
1482 gboolean is_sender = GST_IS_BASE_SINK (srtobject->element);
1483
1484 GST_OBJECT_LOCK (srtobject->element);
1485 if (srtobject->sock != SRT_INVALID_SOCK) {
1486 s = get_stats_for_srtsock (srtobject->sock, is_sender);
1487 goto done;
1488 }
1489
1490 s = gst_structure_new_empty ("application/x-srt-statistics");
1491
1492 if (srtobject->callers) {
1493 GValueArray *callers_stats = g_value_array_new (1);
1494 GValue callers_stats_v = G_VALUE_INIT;
1495 GList *item;
1496
1497 for (item = srtobject->callers; item; item = item->next) {
1498 SRTCaller *caller = item->data;
1499 GstStructure *tmp = get_stats_for_srtsock (caller->sock, is_sender);
1500 GValue *v;
1501
1502 g_value_array_append (callers_stats, NULL);
1503 v = g_value_array_get_nth (callers_stats, callers_stats->n_values - 1);
1504 g_value_init (v, GST_TYPE_STRUCTURE);
1505 g_value_take_boxed (v, tmp);
1506 }
1507
1508 g_value_init (&callers_stats_v, G_TYPE_VALUE_ARRAY);
1509 g_value_take_boxed (&callers_stats_v, callers_stats);
1510 gst_structure_take_value (s, "callers", &callers_stats_v);
1511 }
1512
1513 done:
1514 GST_OBJECT_UNLOCK (srtobject->element);
1515
1516 return s;
1517 }
1518