1 /*
2 * GPAC - Multimedia Framework C SDK
3 *
4 * Authors: Jean Le Feuvre
5 * Copyright (c) Telecom ParisTech 2017-2020
6 * All rights reserved
7 *
8 * This file is part of GPAC / HTTP input filter using GPAC http stack
9 *
10 * GPAC is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU Lesser General Public License as published by
12 * the Free Software Foundation; either version 2, or (at your option)
13 * any later version.
14 *
15 * GPAC is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU Lesser General Public License for more details.
19 *
20 * You should have received a copy of the GNU Lesser General Public
21 * License along with this library; see the file COPYING. If not, write to
22 * the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23 *
24 */
25
26
27 #include <gpac/filters.h>
28 #include <gpac/constants.h>
29 #include <gpac/download.h>
30
31
32 typedef enum
33 {
34 GF_HTTPIN_STORE_DISK=0,
35 GF_HTTPIN_STORE_DISK_KEEP,
36 GF_HTTPIN_STORE_MEM,
37 GF_HTTPIN_STORE_NONE,
38 } GF_HTTPInStoreMode;
39
40 typedef struct
41 {
42 //options
43 char *src;
44 u32 block_size;
45 GF_HTTPInStoreMode cache;
46 GF_Fraction64 range;
47 char *ext;
48 char *mime;
49
50 //internal
51 Bool initial_ack_done;
52 GF_DownloadManager *dm;
53
54 //only one output pid declared
55 GF_FilterPid *pid;
56
57 GF_DownloadSession *sess;
58
59 char *block;
60 Bool pck_out, is_end;
61 u64 nb_read, file_size;
62 FILE *cached;
63
64 Bool do_reconfigure;
65 Bool full_file_only;
66 GF_Err last_state;
67 } GF_HTTPInCtx;
68
httpin_notify_error(GF_Filter * filter,GF_HTTPInCtx * ctx,GF_Err e)69 static void httpin_notify_error(GF_Filter *filter, GF_HTTPInCtx *ctx, GF_Err e)
70 {
71 if (filter && (ctx->last_state == GF_OK)) {
72 if (!ctx->initial_ack_done) {
73 gf_filter_setup_failure(filter, e);
74 ctx->initial_ack_done = GF_TRUE;
75 } else {
76 gf_filter_notification_failure(filter, e, GF_FALSE);
77 }
78 ctx->last_state = e;
79 }
80 }
81
httpin_initialize(GF_Filter * filter)82 static GF_Err httpin_initialize(GF_Filter *filter)
83 {
84 GF_HTTPInCtx *ctx = (GF_HTTPInCtx *) gf_filter_get_udta(filter);
85 GF_Err e;
86 char *server;
87 u32 flags = 0;
88
89 if (!ctx || !ctx->src) return GF_BAD_PARAM;
90 ctx->dm = gf_filter_get_download_manager(filter);
91 if (!ctx->dm) return GF_SERVICE_ERROR;
92
93 ctx->block = gf_malloc(ctx->block_size +1);
94
95 flags = GF_NETIO_SESSION_NOT_THREADED | GF_NETIO_SESSION_PERSISTENT;
96 if (ctx->cache==GF_HTTPIN_STORE_MEM) flags |= GF_NETIO_SESSION_MEMORY_CACHE;
97 else if (ctx->cache==GF_HTTPIN_STORE_NONE) flags |= GF_NETIO_SESSION_NOT_CACHED;
98
99 server = strstr(ctx->src, "://");
100 if (server) server += 3;
101 if (server && strstr(server, "://")) {
102 ctx->is_end = GF_TRUE;
103 return gf_filter_pid_raw_new(filter, server, server, NULL, NULL, NULL, 0, GF_FALSE, &ctx->pid);
104 }
105
106 ctx->sess = gf_dm_sess_new(ctx->dm, ctx->src, flags, NULL, NULL, &e);
107 if (e) {
108 gf_filter_setup_failure(filter, e);
109 ctx->initial_ack_done = GF_TRUE;
110 return e;
111 }
112 if (ctx->range.den) {
113 gf_dm_sess_set_range(ctx->sess, ctx->range.num, ctx->range.den, GF_TRUE);
114 }
115
116 #ifdef GPAC_ENABLE_COVERAGE
117 if (gf_sys_is_cov_mode())
118 httpin_notify_error(NULL, NULL, GF_OK);
119 #endif
120
121 return GF_OK;
122 }
123
httpin_finalize(GF_Filter * filter)124 void httpin_finalize(GF_Filter *filter)
125 {
126 GF_HTTPInCtx *ctx = (GF_HTTPInCtx *) gf_filter_get_udta(filter);
127
128 if (ctx->sess) gf_dm_sess_del(ctx->sess);
129
130 if (ctx->block) gf_free(ctx->block);
131 if (ctx->cached) gf_fclose(ctx->cached);
132 }
133
httpin_probe_url(const char * url,const char * mime_type)134 static GF_FilterProbeScore httpin_probe_url(const char *url, const char *mime_type)
135 {
136 if (!strnicmp(url, "http://", 7) ) return GF_FPROBE_SUPPORTED;
137 if (!strnicmp(url, "https://", 8) ) return GF_FPROBE_SUPPORTED;
138 if (!strnicmp(url, "gmem://", 7) ) return GF_FPROBE_SUPPORTED;
139 return GF_FPROBE_NOT_SUPPORTED;
140 }
141
httpin_process_event(GF_Filter * filter,const GF_FilterEvent * evt)142 static Bool httpin_process_event(GF_Filter *filter, const GF_FilterEvent *evt)
143 {
144 GF_Err e;
145 GF_HTTPInCtx *ctx = (GF_HTTPInCtx *) gf_filter_get_udta(filter);
146
147 if (evt->base.on_pid && (evt->base.on_pid != ctx->pid)) return GF_FALSE;
148
149 switch (evt->base.type) {
150 case GF_FEVT_PLAY:
151 ctx->is_end = GF_FALSE;
152 ctx->full_file_only = evt->play.full_file_only;
153 return GF_TRUE;
154 case GF_FEVT_STOP:
155 if (!ctx->is_end) {
156 //abort session
157 gf_filter_pid_set_eos(ctx->pid);
158 ctx->is_end = GF_TRUE;
159 if (ctx->sess) {
160 gf_dm_sess_abort(ctx->sess);
161 gf_dm_sess_del(ctx->sess);
162 ctx->sess = NULL;
163 }
164 }
165 return GF_TRUE;
166 case GF_FEVT_SOURCE_SEEK:
167 if (evt->seek.start_offset < ctx->file_size) {
168 ctx->is_end = GF_FALSE;
169 //open cache if needed
170 if (!ctx->cached && ctx->file_size && (ctx->nb_read==ctx->file_size) && ctx->sess) {
171 const char *cached = gf_dm_sess_get_cache_name(ctx->sess);
172 if (cached) ctx->cached = gf_fopen(cached, "rb");
173 }
174 ctx->nb_read = evt->seek.start_offset;
175
176 if (ctx->cached) {
177 gf_fseek(ctx->cached, ctx->nb_read, SEEK_SET);
178 } else if (ctx->sess) {
179 gf_dm_sess_abort(ctx->sess);
180 gf_dm_sess_set_range(ctx->sess, ctx->nb_read, 0, GF_TRUE);
181 }
182 ctx->range.den = 0;
183 ctx->range.num = ctx->nb_read;
184 ctx->last_state = GF_OK;
185 } else {
186 GF_LOG(GF_LOG_ERROR, GF_LOG_HTTP, ("[HTTPIn] Requested seek outside file range !\n") );
187 ctx->is_end = GF_TRUE;
188 gf_filter_pid_set_eos(ctx->pid);
189 }
190 return GF_TRUE;
191 case GF_FEVT_SOURCE_SWITCH:
192 assert(ctx->is_end);
193 assert(!ctx->pck_out);
194 if (evt->seek.source_switch) {
195 if (ctx->src && ctx->sess && (ctx->cache!=GF_HTTPIN_STORE_DISK_KEEP) && !evt->seek.previous_is_init_segment) {
196 gf_dm_delete_cached_file_entry_session(ctx->sess, ctx->src);
197 }
198 if (ctx->src) gf_free(ctx->src);
199 ctx->src = gf_strdup(evt->seek.source_switch);
200 }
201 if (ctx->cached) gf_fclose(ctx->cached);
202 ctx->cached = NULL;
203
204 //abort type
205 if (evt->seek.start_offset == (u64) -1) {
206 if (!ctx->is_end) {
207 if (ctx->sess)
208 gf_dm_sess_abort(ctx->sess);
209 ctx->is_end = GF_TRUE;
210 gf_filter_pid_set_eos(ctx->pid);
211 }
212 ctx->nb_read = 0;
213 ctx->last_state = GF_OK;
214 return GF_TRUE;
215 }
216 ctx->last_state = GF_OK;
217 if (ctx->sess) {
218 e = gf_dm_sess_setup_from_url(ctx->sess, ctx->src, evt->seek.skip_cache_expiration);
219 } else {
220 u32 flags;
221
222 flags = GF_NETIO_SESSION_NOT_THREADED | GF_NETIO_SESSION_PERSISTENT;
223 if (ctx->cache==GF_HTTPIN_STORE_MEM) flags |= GF_NETIO_SESSION_MEMORY_CACHE;
224 else if (ctx->cache==GF_HTTPIN_STORE_NONE) flags |= GF_NETIO_SESSION_NOT_CACHED;
225
226 ctx->sess = gf_dm_sess_new(ctx->dm, ctx->src, flags, NULL, NULL, &e);
227 }
228
229 if (!e) e = gf_dm_sess_set_range(ctx->sess, evt->seek.start_offset, evt->seek.end_offset, GF_TRUE);
230 if (e) {
231 GF_LOG(GF_LOG_ERROR, GF_LOG_HTTP, ("[HTTPIn] Cannot resetup session from URL %s: %s\n", ctx->src, gf_error_to_string(e) ) );
232 httpin_notify_error(filter, ctx, e);
233 ctx->is_end = GF_TRUE;
234 if (ctx->src) gf_free(ctx->src);
235 ctx->src = NULL;
236 return GF_TRUE;
237 }
238 ctx->nb_read = ctx->file_size = 0;
239 ctx->do_reconfigure = GF_TRUE;
240 ctx->is_end = GF_FALSE;
241 ctx->last_state = GF_OK;
242 gf_filter_post_process_task(filter);
243 return GF_TRUE;
244 default:
245 break;
246 }
247 return GF_TRUE;
248 }
249
httpin_rel_pck(GF_Filter * filter,GF_FilterPid * pid,GF_FilterPacket * pck)250 static void httpin_rel_pck(GF_Filter *filter, GF_FilterPid *pid, GF_FilterPacket *pck)
251 {
252 GF_HTTPInCtx *ctx = (GF_HTTPInCtx *) gf_filter_get_udta(filter);
253 ctx->pck_out = GF_FALSE;
254 //ready to process again
255 gf_filter_post_process_task(filter);
256 }
257
httpin_process(GF_Filter * filter)258 static GF_Err httpin_process(GF_Filter *filter)
259 {
260 Bool is_start;
261 u32 nb_read=0;
262 GF_FilterPacket *pck;
263 GF_Err e=GF_OK;
264 u32 bytes_per_sec=0;
265 u64 bytes_done=0, total_size, byte_offset;
266 GF_NetIOStatus net_status;
267 GF_HTTPInCtx *ctx = (GF_HTTPInCtx *) gf_filter_get_udta(filter);
268
269 //until packet is released we return EOS (no processing), and ask for processing again upon release
270 if (ctx->pck_out)
271 return GF_EOS;
272
273 if (ctx->is_end)
274 return GF_EOS;
275
276 if (!ctx->sess)
277 return GF_EOS;
278
279 if (!ctx->pid) {
280 if (ctx->nb_read) return GF_SERVICE_ERROR;
281 } else {
282 //TODO: go on fetching data to cache even when not consuming, and reread from cache
283 if (gf_filter_pid_would_block(ctx->pid))
284 return GF_OK;
285 }
286
287 is_start = ctx->nb_read ? GF_FALSE : GF_TRUE;
288 ctx->is_end = GF_FALSE;
289
290 //we read from cache file
291 if (ctx->cached) {
292 u32 to_read;
293 u64 lto_read = ctx->file_size - ctx->nb_read;
294
295 if (lto_read > (u64) ctx->block_size)
296 to_read = (u64) ctx->block_size;
297 else
298 to_read = (u32) lto_read;
299
300 if (ctx->full_file_only) {
301 ctx->is_end = GF_TRUE;
302 pck = gf_filter_pck_new_shared(ctx->pid, ctx->block, 0, httpin_rel_pck);
303 gf_filter_pck_set_framing(pck, is_start, ctx->is_end);
304
305 //mark packet out BEFORE sending, since the call to send() may destroy the packet if cloned
306 ctx->pck_out = GF_TRUE;
307 gf_filter_pck_send(pck);
308
309 gf_filter_pid_set_eos(ctx->pid);
310 return GF_EOS;
311 }
312 nb_read = (u32) gf_fread(ctx->block, to_read, ctx->cached);
313 bytes_per_sec = 0;
314
315 }
316 //we read from network
317 else {
318
319 e = gf_dm_sess_fetch_data(ctx->sess, ctx->block, ctx->block_size, &nb_read);
320 if (e<0) {
321 if (e==GF_IP_NETWORK_EMPTY) {
322 if (ctx->pid) {
323 gf_dm_sess_get_stats(ctx->sess, NULL, NULL, NULL, NULL, &bytes_per_sec, NULL);
324 gf_filter_pid_set_info(ctx->pid, GF_PROP_PID_DOWN_RATE, &PROP_UINT(8*bytes_per_sec) );
325 }
326 gf_filter_ask_rt_reschedule(filter, 1000);
327 return GF_OK;
328 }
329 if (! ctx->nb_read)
330 httpin_notify_error(filter, ctx, e);
331
332 ctx->is_end = GF_TRUE;
333 if (ctx->pid)
334 gf_filter_pid_set_eos(ctx->pid);
335 return e;
336 }
337 gf_dm_sess_get_stats(ctx->sess, NULL, NULL, &total_size, &bytes_done, &bytes_per_sec, &net_status);
338
339 //wait until we have some data to declare the pid
340 if ((e!= GF_EOS) && !nb_read) return GF_OK;
341
342 if (!ctx->pid || ctx->do_reconfigure) {
343 u32 idx;
344 const char *hname, *hval;
345 const char *cached = gf_dm_sess_get_cache_name(ctx->sess);
346
347 ctx->do_reconfigure = GF_FALSE;
348
349 if ((e==GF_EOS) && cached && strnicmp(cached, "gmem://", 7)) {
350 ctx->cached = gf_fopen(cached, "rb");
351 if (ctx->cached) {
352 nb_read = (u32) gf_fread(ctx->block, ctx->block_size, ctx->cached);
353 } else {
354 GF_LOG(GF_LOG_ERROR, GF_LOG_HTTP, ("[HTTPIn] Failed to open cached file %s\n", cached));
355 }
356 }
357 ctx->file_size = total_size;
358 ctx->block[nb_read] = 0;
359 e = gf_filter_pid_raw_new(filter, ctx->src, cached, ctx->mime ? ctx->mime : gf_dm_sess_mime_type(ctx->sess), ctx->ext, ctx->block, nb_read, ctx->mime ? GF_TRUE : GF_FALSE, &ctx->pid);
360 if (e) return e;
361
362 gf_filter_pid_set_property(ctx->pid, GF_PROP_PID_FILE_CACHED, &PROP_BOOL(GF_FALSE) );
363
364 if (!ctx->initial_ack_done) {
365 ctx->initial_ack_done = GF_TRUE;
366 gf_filter_pid_set_property(ctx->pid, GF_PROP_PID_DOWNLOAD_SESSION, &PROP_POINTER( (void*)ctx->sess ) );
367 }
368
369 /*in test mode don't expose http headers (they contain date/version/etc)*/
370 if (! gf_sys_is_test_mode()) {
371 idx = 0;
372 while (gf_dm_sess_enum_headers(ctx->sess, &idx, &hname, &hval) == GF_OK) {
373 gf_filter_pid_set_property_dyn(ctx->pid, (char *) hname, & PROP_STRING(hval));
374 }
375 }
376 }
377
378 gf_filter_pid_set_info(ctx->pid, GF_PROP_PID_DOWN_RATE, &PROP_UINT(8*bytes_per_sec) );
379 if (ctx->range.num && ctx->file_size) {
380 gf_filter_pid_set_info(ctx->pid, GF_PROP_PID_DOWN_BYTES, &PROP_LONGUINT(bytes_done + ctx->range.num) );
381 gf_filter_pid_set_info(ctx->pid, GF_PROP_PID_DOWN_SIZE, &PROP_LONGUINT(ctx->file_size) );
382 } else {
383 gf_filter_pid_set_info(ctx->pid, GF_PROP_PID_DOWN_BYTES, &PROP_LONGUINT(bytes_done) );
384 gf_filter_pid_set_info(ctx->pid, GF_PROP_PID_DOWN_SIZE, &PROP_LONGUINT(ctx->file_size ? ctx->file_size : bytes_done) );
385 }
386 }
387
388 byte_offset = ctx->nb_read;
389
390 ctx->nb_read += nb_read;
391 if (ctx->file_size && (ctx->nb_read==ctx->file_size)) {
392 ctx->is_end = GF_TRUE;
393 } else if (e==GF_EOS) {
394 ctx->is_end = GF_TRUE;
395 }
396
397 pck = gf_filter_pck_new_shared(ctx->pid, ctx->block, nb_read, httpin_rel_pck);
398 if (!pck) return GF_OK;
399
400 gf_filter_pck_set_cts(pck, 0);
401
402 gf_filter_pck_set_framing(pck, is_start, ctx->is_end);
403 gf_filter_pck_set_sap(pck, GF_FILTER_SAP_1);
404 gf_filter_pck_set_byte_offset(pck, byte_offset);
405
406 //mark packet out BEFORE sending, since the call to send() may destroy the packet if cloned
407 ctx->pck_out = GF_TRUE;
408 gf_filter_pck_send(pck);
409
410 if (ctx->file_size && gf_filter_reporting_enabled(filter)) {
411 char szStatus[1024], *szSrc;
412 szSrc = gf_file_basename(ctx->src);
413
414 sprintf(szStatus, "%s: % 16"LLD_SUF" /% 16"LLD_SUF" (%02.02f) % 8d kbps", szSrc, (s64) bytes_done, (s64) ctx->file_size, ((Double)bytes_done*100.0)/ctx->file_size, bytes_per_sec*8/1000);
415 gf_filter_update_status(filter, (u32) (bytes_done*10000/ctx->file_size), szStatus);
416 }
417
418 if (ctx->is_end) {
419 const char *cached = gf_dm_sess_get_cache_name(ctx->sess);
420 if (cached)
421 gf_filter_pid_set_property(ctx->pid, GF_PROP_PID_FILE_CACHED, &PROP_BOOL(GF_TRUE) );
422
423 gf_filter_pid_set_eos(ctx->pid);
424 return GF_EOS;
425 }
426
427 return ctx->pck_out ? GF_EOS : GF_OK;
428 }
429
430
431
432 #define OFFS(_n) #_n, offsetof(GF_HTTPInCtx, _n)
433
434 static const GF_FilterArgs HTTPInArgs[] =
435 {
436 { OFFS(src), "location of source content", GF_PROP_NAME, NULL, NULL, 0},
437 { OFFS(block_size), "block size used to read file", GF_PROP_UINT, "100000", NULL, GF_FS_ARG_HINT_ADVANCED},
438 { OFFS(cache), "set cache mode\n"
439 "- disk: cache to disk, discard once session is no longer used\n"
440 "- disk: cache to disk and keep\n"
441 "- mem: stores to memory, discard once session is no longer used\n"
442 "- none: no cache", GF_PROP_UINT, "disk", "disk|keep|mem|none", GF_FS_ARG_HINT_ADVANCED},
443 { OFFS(range), "set byte range, as fraction", GF_PROP_FRACTION64, "0-0", NULL, 0},
444 { OFFS(ext), "override file extension", GF_PROP_NAME, NULL, NULL, 0},
445 { OFFS(mime), "set file mime type", GF_PROP_NAME, NULL, NULL, 0},
446 {0}
447 };
448
449 static const GF_FilterCapability HTTPInCaps[] =
450 {
451 CAP_UINT(GF_CAPS_OUTPUT, GF_PROP_PID_STREAM_TYPE, GF_STREAM_FILE),
452 };
453
454 GF_FilterRegister HTTPInRegister = {
455 .name = "httpin",
456 GF_FS_SET_DESCRIPTION("HTTP input")
457 GF_FS_SET_HELP("This filter dispatch raw blocks from a remote HTTP resource into a filter chain.\n"
458 "Block size can be adjusted using [-block_size](), and disk caching policies can be adjusted.\n"
459 "Content format can be forced through [-mime]() and file extension can be changed through [-ext]().\n"
460 "Note: Unless disabled at session level (see [-no-probe](CORE) ), file extensions are usually ignored and format probing is done on the first data block.")
461 .private_size = sizeof(GF_HTTPInCtx),
462 .flags = GF_FS_REG_BLOCKING,
463 .args = HTTPInArgs,
464 SETCAPS(HTTPInCaps),
465 .initialize = httpin_initialize,
466 .finalize = httpin_finalize,
467 .process = httpin_process,
468 .process_event = httpin_process_event,
469 .probe_url = httpin_probe_url
470 };
471
472
httpin_register(GF_FilterSession * session)473 const GF_FilterRegister *httpin_register(GF_FilterSession *session)
474 {
475 return &HTTPInRegister;
476 }
477
478