1 /*
2 * GPAC - Multimedia Framework C SDK
3 *
4 * Authors: Jean Le Feuvre
5 * Copyright (c) Telecom ParisTech 2017-2019
6 * All rights reserved
7 *
8 * This file is part of GPAC / generic TCP/UDP input filter
9 *
10 * GPAC is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU Lesser General Public License as published by
12 * the Free Software Foundation; either version 2, or (at your option)
13 * any later version.
14 *
15 * GPAC is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU Lesser General Public License for more details.
19 *
20 * You should have received a copy of the GNU Lesser General Public
21 * License along with this library; see the file COPYING. If not, write to
22 * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 *
24 */
25
26
27 #include <gpac/filters.h>
28 #include <gpac/constants.h>
29 #include <gpac/network.h>
30
31 #ifndef GPAC_DISABLE_STREAMING
32 #include <gpac/internal/ietf_dev.h>
33 #endif
34
35 typedef struct
36 {
37 GF_FilterPid *pid;
38 GF_Socket *socket;
39 Bool pck_out;
40 #ifndef GPAC_DISABLE_STREAMING
41 GF_RTPReorder *rtp_reorder;
42 #else
43 Bool is_rtp;
44 #endif
45 char address[GF_MAX_IP_NAME_LEN];
46
47 u64 start_time;
48 u64 nb_bytes;
49 Bool done;
50
51 } GF_SockInClient;
52
53 typedef struct
54 {
55 //options
56 const char *src;
57 u32 block_size, sockbuf;
58 u32 port, maxc;
59 char *ifce;
60 const char *ext;
61 const char *mime;
62 Bool tsprobe, listen, ka, block;
63 u32 timeout;
64 #ifndef GPAC_DISABLE_STREAMING
65 u32 reorder_pck;
66 u32 reorder_delay;
67 #endif
68
69 GF_SockInClient sock_c;
70 GF_List *clients;
71 Bool had_clients;
72 Bool is_udp;
73
74 char *buffer;
75
76 GF_SockGroup *active_sockets;
77 u64 last_rcv_time;
78 } GF_SockInCtx;
79
80
81
sockin_initialize(GF_Filter * filter)82 static GF_Err sockin_initialize(GF_Filter *filter)
83 {
84 char *str, *url;
85 u16 port;
86 u32 sock_type = 0;
87 GF_Err e = GF_OK;
88 GF_SockInCtx *ctx = (GF_SockInCtx *) gf_filter_get_udta(filter);
89
90 if (!ctx || !ctx->src) return GF_BAD_PARAM;
91
92 ctx->active_sockets = gf_sk_group_new();
93 if (!ctx->active_sockets) return GF_OUT_OF_MEM;
94
95 if (!strnicmp(ctx->src, "udp://", 6)) {
96 sock_type = GF_SOCK_TYPE_UDP;
97 ctx->listen = GF_FALSE;
98 ctx->is_udp = GF_TRUE;
99 } else if (!strnicmp(ctx->src, "tcp://", 6)) {
100 sock_type = GF_SOCK_TYPE_TCP;
101 #ifdef GPAC_HAS_SOCK_UN
102 } else if (!strnicmp(ctx->src, "tcpu://", 7) ) {
103 sock_type = GF_SOCK_TYPE_TCP_UN;
104 } else if (!strnicmp(ctx->src, "udpu://", 7) ) {
105 sock_type = GF_SOCK_TYPE_UDP_UN;
106 ctx->listen = GF_FALSE;
107 #endif
108 } else {
109 return GF_NOT_SUPPORTED;
110 }
111
112 url = strchr(ctx->src, ':');
113 url += 3;
114
115 ctx->sock_c.socket = gf_sk_new(sock_type);
116 if (! ctx->sock_c.socket ) {
117 GF_LOG(GF_LOG_ERROR, GF_LOG_NETWORK, ("[SockIn] Failed to open socket for %s\n", ctx->src));
118 return GF_IO_ERR;
119 }
120 gf_sk_group_register(ctx->active_sockets, ctx->sock_c.socket);
121
122 /*setup port and src*/
123 port = ctx->port;
124 str = strrchr(url, ':');
125 /*take care of IPv6 address*/
126 if (str && strchr(str, ']')) str = strchr(url, ':');
127 if (str) {
128 port = atoi(str+1);
129 str[0] = 0;
130 }
131
132 /*do we have a source ?*/
133 if (gf_sk_is_multicast_address(url)) {
134 e = gf_sk_setup_multicast(ctx->sock_c.socket, url, port, 0, 0, ctx->ifce);
135 ctx->listen = GF_FALSE;
136 } else if ((sock_type==GF_SOCK_TYPE_UDP)
137 #ifdef GPAC_HAS_SOCK_UN
138 || (sock_type==GF_SOCK_TYPE_UDP_UN)
139 #endif
140 ) {
141 e = gf_sk_bind(ctx->sock_c.socket, ctx->ifce, port, url, port, GF_SOCK_REUSE_PORT);
142 ctx->listen = GF_FALSE;
143 if (!e)
144 e = gf_sk_connect(ctx->sock_c.socket, url, port, NULL);
145 } else if (ctx->listen) {
146 e = gf_sk_bind(ctx->sock_c.socket, NULL, port, url, 0, GF_SOCK_REUSE_PORT);
147 if (!e)
148 e = gf_sk_listen(ctx->sock_c.socket, ctx->maxc);
149 if (!e) {
150 gf_filter_post_process_task(filter);
151 gf_sk_server_mode(ctx->sock_c.socket, GF_TRUE);
152 }
153
154 } else {
155 e = gf_sk_connect(ctx->sock_c.socket, url, port, ctx->ifce);
156 }
157
158 if (str) str[0] = ':';
159
160 if (e) {
161 gf_sk_del(ctx->sock_c.socket);
162 ctx->sock_c.socket = NULL;
163 return e;
164 }
165
166 gf_sk_set_buffer_size(ctx->sock_c.socket, 0, ctx->sockbuf);
167 gf_sk_set_block_mode(ctx->sock_c.socket, !ctx->block);
168
169
170 GF_LOG(GF_LOG_INFO, GF_LOG_NETWORK, ("[SockIn] opening %s%s\n", ctx->src, ctx->listen ? " in server mode" : ""));
171
172 if (ctx->block_size<2000)
173 ctx->block_size = 2000;
174 ctx->buffer = gf_malloc(ctx->block_size + 1);
175 if (!ctx->buffer) return GF_OUT_OF_MEM;
176 //ext/mime given and not mpeg2, disable probe
177 if (ctx->ext && !strstr("ts|m2t|mts|dmb|trp", ctx->ext)) ctx->tsprobe = GF_FALSE;
178 if (ctx->mime && !strstr(ctx->mime, "mpeg-2") && !strstr(ctx->mime, "mp2t")) ctx->tsprobe = GF_FALSE;
179
180 if (ctx->listen) {
181 ctx->clients = gf_list_new();
182 if (!ctx->clients) return GF_OUT_OF_MEM;
183 }
184 return GF_OK;
185 }
186
sockin_client_reset(GF_SockInClient * sc)187 static void sockin_client_reset(GF_SockInClient *sc)
188 {
189 if (sc->socket) gf_sk_del(sc->socket);
190 #ifndef GPAC_DISABLE_STREAMING
191 if (sc->rtp_reorder) gf_rtp_reorderer_del(sc->rtp_reorder);
192 #endif
193 }
194
sockin_finalize(GF_Filter * filter)195 static void sockin_finalize(GF_Filter *filter)
196 {
197 GF_SockInCtx *ctx = (GF_SockInCtx *) gf_filter_get_udta(filter);
198
199 if (ctx->clients) {
200 while (gf_list_count(ctx->clients)) {
201 GF_SockInClient *sc = gf_list_pop_back(ctx->clients);
202 sockin_client_reset(sc);
203 gf_free(sc);
204 }
205 gf_list_del(ctx->clients);
206 }
207 sockin_client_reset(&ctx->sock_c);
208 if (ctx->buffer) gf_free(ctx->buffer);
209 if (ctx->active_sockets) gf_sk_group_del(ctx->active_sockets);
210 }
211
sockin_probe_url(const char * url,const char * mime_type)212 static GF_FilterProbeScore sockin_probe_url(const char *url, const char *mime_type)
213 {
214 if (!strnicmp(url, "udp://", 6)) return GF_FPROBE_SUPPORTED;
215 if (!strnicmp(url, "tcp://", 6)) return GF_FPROBE_SUPPORTED;
216 #ifdef GPAC_HAS_SOCK_UN
217 if (!strnicmp(url, "udpu://", 7)) return GF_FPROBE_SUPPORTED;
218 if (!strnicmp(url, "tcpu://", 7)) return GF_FPROBE_SUPPORTED;
219 #endif
220 return GF_FPROBE_NOT_SUPPORTED;
221 }
222
sockin_rtp_destructor(GF_Filter * filter,GF_FilterPid * pid,GF_FilterPacket * pck)223 static void sockin_rtp_destructor(GF_Filter *filter, GF_FilterPid *pid, GF_FilterPacket *pck)
224 {
225 u32 size;
226 char *data;
227 GF_SockInClient *sc = (GF_SockInClient *) gf_filter_pid_get_udta(pid);
228 sc->pck_out = GF_FALSE;
229 data = (char *) gf_filter_pck_get_data(pck, &size);
230 if (data) gf_free(data);
231 }
232
sockin_process_event(GF_Filter * filter,const GF_FilterEvent * evt)233 static Bool sockin_process_event(GF_Filter *filter, const GF_FilterEvent *evt)
234 {
235 if (!evt->base.on_pid) return GF_FALSE;
236
237 switch (evt->base.type) {
238 case GF_FEVT_PLAY:
239 return GF_TRUE;
240 case GF_FEVT_STOP:
241 return GF_TRUE;
242 default:
243 break;
244 }
245 return GF_FALSE;
246 }
247
sockin_read_client(GF_Filter * filter,GF_SockInCtx * ctx,GF_SockInClient * sock_c)248 static GF_Err sockin_read_client(GF_Filter *filter, GF_SockInCtx *ctx, GF_SockInClient *sock_c)
249 {
250 u32 nb_read;
251 u64 bitrate;
252 GF_Err e;
253 GF_FilterPacket *dst_pck;
254 u8 *out_data, *in_data;
255
256 if (!sock_c->socket)
257 return GF_EOS;
258 if (sock_c->pck_out)
259 return GF_OK;
260
261 if (sock_c->pid && gf_filter_pid_would_block(sock_c->pid)) {
262 return GF_OK;
263 }
264
265 if (!sock_c->start_time) sock_c->start_time = gf_sys_clock_high_res();
266
267 e = gf_sk_receive_no_select(sock_c->socket, ctx->buffer, ctx->block_size, &nb_read);
268 switch (e) {
269 case GF_IP_NETWORK_EMPTY:
270 return GF_OK;
271 case GF_OK:
272 break;
273 case GF_IP_CONNECTION_CLOSED:
274 if (!sock_c->done) {
275 sock_c->done = GF_TRUE;
276 gf_filter_pid_set_eos(sock_c->pid);
277 }
278 return GF_EOS;
279 default:
280 return e;
281 }
282 if (!nb_read) return GF_OK;
283 sock_c->nb_bytes += nb_read;
284 sock_c->done = GF_FALSE;
285
286 //we allocated one more byte for that
287 ctx->buffer[nb_read] = 0;
288
289 //first run, probe data
290 if (!sock_c->pid) {
291 const char *mime = ctx->mime;
292 //probe MPEG-2
293 if (ctx->tsprobe) {
294 /*TS over RTP signaled as udp */
295 if ((ctx->buffer[0] != 0x47) && ((ctx->buffer[1] & 0x7F) == 33) ) {
296 #ifndef GPAC_DISABLE_STREAMING
297 sock_c->rtp_reorder = gf_rtp_reorderer_new(ctx->reorder_pck, ctx->reorder_delay);
298 #else
299 ctx- >is_rtp = GF_TRUE;
300 #endif
301 mime = "video/mp2t";
302 } else if (ctx->buffer[0] == 0x47) {
303 mime = "video/mp2t";
304 }
305 }
306
307 e = gf_filter_pid_raw_new(filter, ctx->src, NULL, mime, ctx->ext, ctx->buffer, nb_read, GF_TRUE, &sock_c->pid);
308 if (e) return e;
309
310 // if (ctx->is_udp) gf_filter_pid_set_property(sock_c->pid, GF_PROP_PID_UDP, &PROP_BOOL(GF_TRUE) );
311
312 gf_filter_pid_set_udta(sock_c->pid, sock_c);
313
314 #ifdef GPAC_ENABLE_COVERAGE
315 if (gf_sys_is_cov_mode()) {
316 GF_FilterEvent evt;
317 memset(&evt, 0, sizeof(GF_FilterEvent));
318 evt.base.type = GF_FEVT_PLAY;
319 evt.base.on_pid = sock_c->pid;
320 sockin_process_event(filter, &evt);
321 }
322 #endif
323
324 }
325
326 in_data = ctx->buffer;
327
328 #ifndef GPAC_DISABLE_STREAMING
329 if (sock_c->rtp_reorder) {
330 char *pck;
331 u16 seq_num = ((ctx->buffer[2] << 8) & 0xFF00) | (ctx->buffer[3] & 0xFF);
332 gf_rtp_reorderer_add(sock_c->rtp_reorder, (void *) ctx->buffer, nb_read, seq_num);
333
334 pck = (char *) gf_rtp_reorderer_get(sock_c->rtp_reorder, &nb_read, GF_FALSE);
335 if (pck) {
336 dst_pck = gf_filter_pck_new_shared(sock_c->pid, pck+12, nb_read-12, sockin_rtp_destructor);
337 gf_filter_pck_set_framing(dst_pck, GF_TRUE, GF_TRUE);
338 gf_filter_pck_send(dst_pck);
339 }
340 return GF_OK;
341 }
342 #else
343 if (sock_c->is_rtp) {
344 in_data = ctx->buffer + 12;
345 nb_read -= 12;
346 }
347 #endif
348
349 dst_pck = gf_filter_pck_new_alloc(sock_c->pid, nb_read, &out_data);
350 memcpy(out_data, in_data, nb_read);
351
352 gf_filter_pck_set_framing(dst_pck, (sock_c->nb_bytes == nb_read) ? GF_TRUE : GF_FALSE, GF_FALSE);
353 gf_filter_pck_send(dst_pck);
354
355 //send bitrate
356 bitrate = ( gf_sys_clock_high_res() - sock_c->start_time );
357 if (bitrate) {
358 bitrate = (sock_c->nb_bytes * 8 * 1000000) / bitrate;
359 gf_filter_pid_set_property(sock_c->pid, GF_PROP_PID_DOWN_RATE, &PROP_UINT((u32) bitrate) );
360 GF_LOG(GF_LOG_INFO, GF_LOG_NETWORK, ("[SockIn] Receiving from %s at %d kbps\r", sock_c->address, (u32) (bitrate/10)));
361 }
362
363 return GF_OK;
364 }
365
sockin_check_eos(GF_SockInCtx * ctx)366 static Bool sockin_check_eos(GF_SockInCtx *ctx)
367 {
368 u64 now = gf_sys_clock_high_res();
369 if (!ctx->last_rcv_time) {
370 ctx->last_rcv_time = now;
371 return GF_FALSE;
372 }
373 if (now - ctx->last_rcv_time < ctx->timeout*1000) {
374 return GF_FALSE;
375 }
376 if (ctx->sock_c.pid && !ctx->sock_c.done) {
377 gf_filter_pid_set_eos(ctx->sock_c.pid);
378 ctx->sock_c.done = GF_TRUE;
379 }
380 return GF_TRUE;
381 }
382
sockin_process(GF_Filter * filter)383 static GF_Err sockin_process(GF_Filter *filter)
384 {
385 GF_Socket *new_conn=NULL;
386 GF_Err e;
387 u32 i, count;
388 GF_SockInCtx *ctx = (GF_SockInCtx *) gf_filter_get_udta(filter);
389
390 e = gf_sk_group_select(ctx->active_sockets, 10, GF_SK_SELECT_READ);
391 if (e==GF_IP_NETWORK_EMPTY) {
392 if (ctx->is_udp) {
393 if (sockin_check_eos(ctx) )
394 return GF_EOS;
395 } else if (!gf_list_count(ctx->clients)) {
396 gf_filter_ask_rt_reschedule(filter, 1000);
397 return GF_OK;
398 }
399
400 gf_filter_ask_rt_reschedule(filter, 1000);
401 return GF_OK;
402 }
403 else if (e) return e;
404
405 if (gf_sk_group_sock_is_set(ctx->active_sockets, ctx->sock_c.socket, GF_SK_SELECT_READ)) {
406 if (!ctx->listen) {
407 return sockin_read_client(filter, ctx, &ctx->sock_c);
408 }
409
410 if (gf_sk_group_sock_is_set(ctx->active_sockets, ctx->sock_c.socket, GF_SK_SELECT_READ)) {
411 e = gf_sk_accept(ctx->sock_c.socket, &new_conn);
412 if ((e==GF_OK) && new_conn) {
413 GF_SockInClient *sc;
414 GF_SAFEALLOC(sc, GF_SockInClient);
415 if (!sc) return GF_OUT_OF_MEM;
416
417 sc->socket = new_conn;
418 strcpy(sc->address, "unknown");
419 gf_sk_get_remote_address(new_conn, sc->address);
420 gf_sk_set_block_mode(new_conn, !ctx->block);
421
422 GF_LOG(GF_LOG_INFO, GF_LOG_NETWORK, ("[SockIn] Accepting new connection from %s\n", sc->address));
423 gf_list_add(ctx->clients, sc);
424 ctx->had_clients = GF_TRUE;
425 gf_sk_group_register(ctx->active_sockets, sc->socket);
426 }
427 }
428 }
429 if (!ctx->listen) return GF_OK;
430
431 count = gf_list_count(ctx->clients);
432 for (i=0; i<count; i++) {
433 GF_SockInClient *sc = gf_list_get(ctx->clients, i);
434
435 if (!gf_sk_group_sock_is_set(ctx->active_sockets, sc->socket, GF_SK_SELECT_READ)) continue;
436
437 e = sockin_read_client(filter, ctx, sc);
438 if (e == GF_IP_CONNECTION_CLOSED) {
439 GF_LOG(GF_LOG_WARNING, GF_LOG_NETWORK, ("[SockIn] Connection to %s lost, removing input\n", sc->address));
440 if (sc->socket)
441 gf_sk_group_unregister(ctx->active_sockets, sc->socket);
442
443 sockin_client_reset(sc);
444 if (sc->pid) {
445 gf_filter_pid_set_eos(sc->pid);
446 gf_filter_pid_remove(sc->pid);
447 }
448 gf_free(sc);
449 gf_list_del_item(ctx->clients, sc);
450 i--;
451 count--;
452 } else {
453 if (e) return e;
454 }
455 }
456 if (!ctx->had_clients) {
457 //we should use socket groups and selects !
458 gf_filter_ask_rt_reschedule(filter, 100000);
459 return GF_OK;
460 }
461
462 if (!count) {
463 if (ctx->ka) {
464 //keep alive, ask for real-time reschedule of 100 ms - we should use socket groups and selects !
465 gf_filter_ask_rt_reschedule(filter, 100000);
466 } else {
467 return GF_EOS;
468 }
469 }
470 return GF_OK;
471 }
472
473
474
475 #define OFFS(_n) #_n, offsetof(GF_SockInCtx, _n)
476
477 static const GF_FilterArgs SockInArgs[] =
478 {
479 { OFFS(src), "location of source content", GF_PROP_NAME, NULL, NULL, 0},
480 { OFFS(block_size), "block size used to read socket", GF_PROP_UINT, "10000", NULL, GF_FS_ARG_HINT_ADVANCED},
481 { OFFS(sockbuf), "socket max buffer size", GF_PROP_UINT, "65536", NULL, GF_FS_ARG_HINT_ADVANCED},
482 { OFFS(port), "default port if not specified", GF_PROP_UINT, "1234", NULL, 0},
483 { OFFS(ifce), "default multicast interface", GF_PROP_NAME, NULL, NULL, GF_FS_ARG_HINT_ADVANCED},
484 { OFFS(listen), "indicate the input socket works in server mode", GF_PROP_BOOL, "false", NULL, 0},
485 { OFFS(ka), "keep socket alive if no more connections", GF_PROP_BOOL, "false", NULL, GF_FS_ARG_HINT_ADVANCED},
486 { OFFS(maxc), "max number of concurrent connections", GF_PROP_UINT, "+I", NULL, 0},
487 { OFFS(tsprobe), "probe for MPEG-2 TS data, either RTP or raw UDP. Disabled if mime or ext are given and do not match MPEG-2 TS mimes/extensions", GF_PROP_BOOL, "true", NULL, GF_FS_ARG_HINT_ADVANCED},
488 { OFFS(ext), "indicate file extension of udp data", GF_PROP_STRING, NULL, NULL, 0},
489 { OFFS(mime), "indicate mime type of udp data", GF_PROP_STRING, NULL, NULL, 0},
490 { OFFS(block), "set blocking mode for socket(s)", GF_PROP_BOOL, "false", NULL, GF_FS_ARG_HINT_ADVANCED},
491 { OFFS(timeout), "set timeout in ms for UDP socket(s)", GF_PROP_UINT, "5000", NULL, GF_FS_ARG_HINT_ADVANCED},
492
493 #ifndef GPAC_DISABLE_STREAMING
494 { OFFS(reorder_pck), "number of packets delay for RTP reordering (M2TS over RTP) ", GF_PROP_UINT, "100", NULL, GF_FS_ARG_HINT_ADVANCED},
495 { OFFS(reorder_delay), "number of ms delay for RTP reordering (M2TS over RTP)", GF_PROP_UINT, "10", NULL, GF_FS_ARG_HINT_ADVANCED},
496 #endif
497 {0}
498 };
499
500 static const GF_FilterCapability SockInCaps[] =
501 {
502 CAP_UINT(GF_CAPS_OUTPUT, GF_PROP_PID_STREAM_TYPE, GF_STREAM_FILE),
503 };
504
505 GF_FilterRegister SockInRegister = {
506 .name = "sockin",
507 GF_FS_SET_DESCRIPTION("UDP/TCP input")
508 #ifndef GPAC_DISABLE_DOC
509 .help = "This filter handles generic TCP and UDP input sockets. It can also probe for MPEG-2 TS over RTP input. Probing of MPEG-2 TS over UDP/RTP is enabled by default but can be turned off.\n"
510 "\nData format can be specified by setting either [-ext]() or [-mime]() options. If not set, the format will be guessed by probing the first data packet\n"
511 "\n"
512 "- UDP sockets are used for source URLs formatted as `udp://NAME`\n"
513 "- TCP sockets are used for source URLs formatted as `tcp://NAME`\n"
514 #ifdef GPAC_HAS_SOCK_UN
515 "- UDP unix domain sockets are used for source URLs formatted as `udpu://NAME`\n"
516 "- TCP unix domain sockets are used for source URLs formatted as `tcpu://NAME`\n"
517 #ifdef GPAC_CONFIG_DARWIN
518 "\nOn OSX with VM packet replay you will need to force multicast routing, eg: route add -net 239.255.1.4/32 -interface vboxnet0"
519 #endif
520 ""
521 #else
522 "Your platform does not supports unix domain sockets, udpu:// and tcpu:// schemes not supported."
523 #endif
524 ,
525 #endif //GPAC_DISABLE_DOC
526 .private_size = sizeof(GF_SockInCtx),
527 .args = SockInArgs,
528 SETCAPS(SockInCaps),
529 .initialize = sockin_initialize,
530 .finalize = sockin_finalize,
531 .process = sockin_process,
532 .process_event = sockin_process_event,
533 .probe_url = sockin_probe_url
534 };
535
536
sockin_register(GF_FilterSession * session)537 const GF_FilterRegister *sockin_register(GF_FilterSession *session)
538 {
539 return &SockInRegister;
540 }
541
542