1 /*
2  *			GPAC - Multimedia Framework C SDK
3  *
4  *			Authors: Jean Le Feuvre
5  *			Copyright (c) Telecom ParisTech 2017-2020
6  *					All rights reserved
7  *
8  *  This file is part of GPAC / HTTP input filter using GPAC http stack
9  *
10  *  GPAC is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU Lesser General Public License as published by
12  *  the Free Software Foundation; either version 2, or (at your option)
13  *  any later version.
14  *
15  *  GPAC is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU Lesser General Public License for more details.
19  *
20  *  You should have received a copy of the GNU Lesser General Public
21  *  License along with this library; see the file COPYING.  If not, write to
22  *  the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
23  *
24  */
25 
26 
27 #include <gpac/filters.h>
28 #include <gpac/constants.h>
29 #include <gpac/download.h>
30 
31 
32 typedef enum
33 {
34 	GF_HTTPIN_STORE_DISK=0,
35 	GF_HTTPIN_STORE_DISK_KEEP,
36 	GF_HTTPIN_STORE_MEM,
37 	GF_HTTPIN_STORE_NONE,
38 } GF_HTTPInStoreMode;
39 
40 typedef struct
41 {
42 	//options
43 	char *src;
44 	u32 block_size;
45 	GF_HTTPInStoreMode cache;
46 	GF_Fraction64 range;
47 	char *ext;
48 	char *mime;
49 
50 	//internal
51 	Bool initial_ack_done;
52 	GF_DownloadManager *dm;
53 
54 	//only one output pid declared
55 	GF_FilterPid *pid;
56 
57 	GF_DownloadSession *sess;
58 
59 	char *block;
60 	Bool pck_out, is_end;
61 	u64 nb_read, file_size;
62 	FILE *cached;
63 
64 	Bool do_reconfigure;
65 	Bool full_file_only;
66 	GF_Err last_state;
67 } GF_HTTPInCtx;
68 
httpin_notify_error(GF_Filter * filter,GF_HTTPInCtx * ctx,GF_Err e)69 static void httpin_notify_error(GF_Filter *filter, GF_HTTPInCtx *ctx, GF_Err e)
70 {
71 	if (filter && (ctx->last_state == GF_OK)) {
72 		if (!ctx->initial_ack_done) {
73 			gf_filter_setup_failure(filter, e);
74 			ctx->initial_ack_done = GF_TRUE;
75 		} else {
76 			gf_filter_notification_failure(filter, e, GF_FALSE);
77 		}
78 		ctx->last_state = e;
79 	}
80 }
81 
httpin_initialize(GF_Filter * filter)82 static GF_Err httpin_initialize(GF_Filter *filter)
83 {
84 	GF_HTTPInCtx *ctx = (GF_HTTPInCtx *) gf_filter_get_udta(filter);
85 	GF_Err e;
86 	char *server;
87 	u32 flags = 0;
88 
89 	if (!ctx || !ctx->src) return GF_BAD_PARAM;
90 	ctx->dm = gf_filter_get_download_manager(filter);
91 	if (!ctx->dm) return GF_SERVICE_ERROR;
92 
93 	ctx->block = gf_malloc(ctx->block_size +1);
94 
95 	flags = GF_NETIO_SESSION_NOT_THREADED | GF_NETIO_SESSION_PERSISTENT;
96 	if (ctx->cache==GF_HTTPIN_STORE_MEM) flags |= GF_NETIO_SESSION_MEMORY_CACHE;
97 	else if (ctx->cache==GF_HTTPIN_STORE_NONE) flags |= GF_NETIO_SESSION_NOT_CACHED;
98 
99 	server = strstr(ctx->src, "://");
100 	if (server) server += 3;
101 	if (server && strstr(server, "://")) {
102 		ctx->is_end = GF_TRUE;
103 		return gf_filter_pid_raw_new(filter, server, server, NULL, NULL, NULL, 0, GF_FALSE, &ctx->pid);
104 	}
105 
106 	ctx->sess = gf_dm_sess_new(ctx->dm, ctx->src, flags, NULL, NULL, &e);
107 	if (e) {
108 		gf_filter_setup_failure(filter, e);
109 		ctx->initial_ack_done = GF_TRUE;
110 		return e;
111 	}
112 	if (ctx->range.den) {
113 		gf_dm_sess_set_range(ctx->sess, ctx->range.num, ctx->range.den, GF_TRUE);
114 	}
115 
116 #ifdef GPAC_ENABLE_COVERAGE
117 	if (gf_sys_is_cov_mode())
118 		httpin_notify_error(NULL, NULL, GF_OK);
119 #endif
120 
121 	return GF_OK;
122 }
123 
httpin_finalize(GF_Filter * filter)124 void httpin_finalize(GF_Filter *filter)
125 {
126 	GF_HTTPInCtx *ctx = (GF_HTTPInCtx *) gf_filter_get_udta(filter);
127 
128 	if (ctx->sess) gf_dm_sess_del(ctx->sess);
129 
130 	if (ctx->block) gf_free(ctx->block);
131 	if (ctx->cached) gf_fclose(ctx->cached);
132 }
133 
httpin_probe_url(const char * url,const char * mime_type)134 static GF_FilterProbeScore httpin_probe_url(const char *url, const char *mime_type)
135 {
136 	if (!strnicmp(url, "http://", 7) ) return GF_FPROBE_SUPPORTED;
137 	if (!strnicmp(url, "https://", 8) ) return GF_FPROBE_SUPPORTED;
138 	if (!strnicmp(url, "gmem://", 7) ) return GF_FPROBE_SUPPORTED;
139 	return GF_FPROBE_NOT_SUPPORTED;
140 }
141 
httpin_process_event(GF_Filter * filter,const GF_FilterEvent * evt)142 static Bool httpin_process_event(GF_Filter *filter, const GF_FilterEvent *evt)
143 {
144 	GF_Err e;
145 	GF_HTTPInCtx *ctx = (GF_HTTPInCtx *) gf_filter_get_udta(filter);
146 
147 	if (evt->base.on_pid && (evt->base.on_pid != ctx->pid)) return GF_FALSE;
148 
149 	switch (evt->base.type) {
150 	case GF_FEVT_PLAY:
151 		ctx->is_end = GF_FALSE;
152 		ctx->full_file_only = evt->play.full_file_only;
153 		return GF_TRUE;
154 	case GF_FEVT_STOP:
155 		if (!ctx->is_end) {
156 			//abort session
157 			gf_filter_pid_set_eos(ctx->pid);
158 			ctx->is_end = GF_TRUE;
159 			if (ctx->sess) {
160 				gf_dm_sess_abort(ctx->sess);
161 				gf_dm_sess_del(ctx->sess);
162 				ctx->sess = NULL;
163 			}
164 		}
165 		return GF_TRUE;
166 	case GF_FEVT_SOURCE_SEEK:
167 		if (evt->seek.start_offset < ctx->file_size) {
168 			ctx->is_end = GF_FALSE;
169 			//open cache if needed
170 			if (!ctx->cached && ctx->file_size && (ctx->nb_read==ctx->file_size) && ctx->sess) {
171 				const char *cached = gf_dm_sess_get_cache_name(ctx->sess);
172 				if (cached) ctx->cached = gf_fopen(cached, "rb");
173 			}
174 			ctx->nb_read = evt->seek.start_offset;
175 
176 			if (ctx->cached) {
177 				gf_fseek(ctx->cached, ctx->nb_read, SEEK_SET);
178 			} else if (ctx->sess) {
179 				gf_dm_sess_abort(ctx->sess);
180 				gf_dm_sess_set_range(ctx->sess, ctx->nb_read, 0, GF_TRUE);
181 			}
182 			ctx->range.den = 0;
183 			ctx->range.num = ctx->nb_read;
184 			ctx->last_state = GF_OK;
185 		} else {
186 			GF_LOG(GF_LOG_ERROR, GF_LOG_HTTP, ("[HTTPIn] Requested seek outside file range !\n") );
187 			ctx->is_end = GF_TRUE;
188 			gf_filter_pid_set_eos(ctx->pid);
189 		}
190 		return GF_TRUE;
191 	case GF_FEVT_SOURCE_SWITCH:
192 		assert(ctx->is_end);
193 		assert(!ctx->pck_out);
194 		if (evt->seek.source_switch) {
195 			if (ctx->src && ctx->sess && (ctx->cache!=GF_HTTPIN_STORE_DISK_KEEP) && !evt->seek.previous_is_init_segment) {
196 				gf_dm_delete_cached_file_entry_session(ctx->sess, ctx->src);
197 			}
198 			if (ctx->src) gf_free(ctx->src);
199 			ctx->src = gf_strdup(evt->seek.source_switch);
200 		}
201 		if (ctx->cached) gf_fclose(ctx->cached);
202 		ctx->cached = NULL;
203 
204 		//abort type
205 		if (evt->seek.start_offset == (u64) -1) {
206 			if (!ctx->is_end) {
207 				if (ctx->sess)
208 					gf_dm_sess_abort(ctx->sess);
209 				ctx->is_end = GF_TRUE;
210 				gf_filter_pid_set_eos(ctx->pid);
211 			}
212 			ctx->nb_read = 0;
213 			ctx->last_state = GF_OK;
214 			return GF_TRUE;
215 		}
216 		ctx->last_state = GF_OK;
217 		if (ctx->sess) {
218 			e = gf_dm_sess_setup_from_url(ctx->sess, ctx->src, evt->seek.skip_cache_expiration);
219 		} else {
220 			u32 flags;
221 
222 			flags = GF_NETIO_SESSION_NOT_THREADED | GF_NETIO_SESSION_PERSISTENT;
223 			if (ctx->cache==GF_HTTPIN_STORE_MEM) flags |= GF_NETIO_SESSION_MEMORY_CACHE;
224 			else if (ctx->cache==GF_HTTPIN_STORE_NONE) flags |= GF_NETIO_SESSION_NOT_CACHED;
225 
226 			ctx->sess = gf_dm_sess_new(ctx->dm, ctx->src, flags, NULL, NULL, &e);
227 		}
228 
229 		if (!e) e = gf_dm_sess_set_range(ctx->sess, evt->seek.start_offset, evt->seek.end_offset, GF_TRUE);
230 		if (e) {
231 			GF_LOG(GF_LOG_ERROR, GF_LOG_HTTP, ("[HTTPIn] Cannot resetup session from URL %s: %s\n", ctx->src, gf_error_to_string(e) ) );
232 			httpin_notify_error(filter, ctx, e);
233 			ctx->is_end = GF_TRUE;
234 			if (ctx->src) gf_free(ctx->src);
235 			ctx->src = NULL;
236 			return GF_TRUE;
237 		}
238 		ctx->nb_read = ctx->file_size = 0;
239 		ctx->do_reconfigure = GF_TRUE;
240 		ctx->is_end = GF_FALSE;
241 		ctx->last_state = GF_OK;
242 		gf_filter_post_process_task(filter);
243 		return GF_TRUE;
244 	default:
245 		break;
246 	}
247 	return GF_TRUE;
248 }
249 
httpin_rel_pck(GF_Filter * filter,GF_FilterPid * pid,GF_FilterPacket * pck)250 static void httpin_rel_pck(GF_Filter *filter, GF_FilterPid *pid, GF_FilterPacket *pck)
251 {
252 	GF_HTTPInCtx *ctx = (GF_HTTPInCtx *) gf_filter_get_udta(filter);
253 	ctx->pck_out = GF_FALSE;
254 	//ready to process again
255 	gf_filter_post_process_task(filter);
256 }
257 
httpin_process(GF_Filter * filter)258 static GF_Err httpin_process(GF_Filter *filter)
259 {
260 	Bool is_start;
261 	u32 nb_read=0;
262 	GF_FilterPacket *pck;
263 	GF_Err e=GF_OK;
264 	u32 bytes_per_sec=0;
265 	u64 bytes_done=0, total_size, byte_offset;
266 	GF_NetIOStatus net_status;
267 	GF_HTTPInCtx *ctx = (GF_HTTPInCtx *) gf_filter_get_udta(filter);
268 
269 	//until packet is released we return EOS (no processing), and ask for processing again upon release
270 	if (ctx->pck_out)
271 		return GF_EOS;
272 
273 	if (ctx->is_end)
274 		return GF_EOS;
275 
276 	if (!ctx->sess)
277 		return GF_EOS;
278 
279 	if (!ctx->pid) {
280 		if (ctx->nb_read) return GF_SERVICE_ERROR;
281 	} else {
282 		//TODO: go on fetching data to cache even when not consuming, and reread from cache
283 		if (gf_filter_pid_would_block(ctx->pid))
284 			return GF_OK;
285 	}
286 
287 	is_start = ctx->nb_read ? GF_FALSE : GF_TRUE;
288 	ctx->is_end = GF_FALSE;
289 
290 	//we read from cache file
291 	if (ctx->cached) {
292 		u32 to_read;
293 		u64 lto_read = ctx->file_size - ctx->nb_read;
294 
295 		if (lto_read > (u64) ctx->block_size)
296 			to_read = (u64) ctx->block_size;
297 		else
298 			to_read = (u32) lto_read;
299 
300 		if (ctx->full_file_only) {
301 			ctx->is_end = GF_TRUE;
302 			pck = gf_filter_pck_new_shared(ctx->pid, ctx->block, 0, httpin_rel_pck);
303 			gf_filter_pck_set_framing(pck, is_start, ctx->is_end);
304 
305 			//mark packet out BEFORE sending, since the call to send() may destroy the packet if cloned
306 			ctx->pck_out = GF_TRUE;
307 			gf_filter_pck_send(pck);
308 
309 			gf_filter_pid_set_eos(ctx->pid);
310 			return GF_EOS;
311 		}
312 		nb_read = (u32) gf_fread(ctx->block, to_read, ctx->cached);
313 		bytes_per_sec = 0;
314 
315 	}
316 	//we read from network
317 	else {
318 
319 		e = gf_dm_sess_fetch_data(ctx->sess, ctx->block, ctx->block_size, &nb_read);
320 		if (e<0) {
321 			if (e==GF_IP_NETWORK_EMPTY) {
322 				if (ctx->pid) {
323 					gf_dm_sess_get_stats(ctx->sess, NULL, NULL, NULL, NULL, &bytes_per_sec, NULL);
324 					gf_filter_pid_set_info(ctx->pid, GF_PROP_PID_DOWN_RATE, &PROP_UINT(8*bytes_per_sec) );
325 				}
326 				gf_filter_ask_rt_reschedule(filter, 1000);
327 				return GF_OK;
328 			}
329 			if (! ctx->nb_read)
330 				httpin_notify_error(filter, ctx, e);
331 
332 			ctx->is_end = GF_TRUE;
333 			if (ctx->pid)
334 				gf_filter_pid_set_eos(ctx->pid);
335 			return e;
336 		}
337 		gf_dm_sess_get_stats(ctx->sess, NULL, NULL, &total_size, &bytes_done, &bytes_per_sec, &net_status);
338 
339 		//wait until we have some data to declare the pid
340 		if ((e!= GF_EOS) && !nb_read) return GF_OK;
341 
342 		if (!ctx->pid || ctx->do_reconfigure) {
343 			u32 idx;
344 			const char *hname, *hval;
345 			const char *cached = gf_dm_sess_get_cache_name(ctx->sess);
346 
347 			ctx->do_reconfigure = GF_FALSE;
348 
349 			if ((e==GF_EOS) && cached && strnicmp(cached, "gmem://", 7)) {
350 				ctx->cached = gf_fopen(cached, "rb");
351 				if (ctx->cached) {
352 					nb_read = (u32) gf_fread(ctx->block, ctx->block_size, ctx->cached);
353 				} else {
354 					GF_LOG(GF_LOG_ERROR, GF_LOG_HTTP, ("[HTTPIn] Failed to open cached file %s\n", cached));
355 				}
356 			}
357 			ctx->file_size = total_size;
358 			ctx->block[nb_read] = 0;
359 			e = gf_filter_pid_raw_new(filter, ctx->src, cached, ctx->mime ? ctx->mime : gf_dm_sess_mime_type(ctx->sess), ctx->ext, ctx->block, nb_read, ctx->mime ? GF_TRUE : GF_FALSE, &ctx->pid);
360 			if (e) return e;
361 
362 			gf_filter_pid_set_property(ctx->pid, GF_PROP_PID_FILE_CACHED, &PROP_BOOL(GF_FALSE) );
363 
364 			if (!ctx->initial_ack_done) {
365 				ctx->initial_ack_done = GF_TRUE;
366 				gf_filter_pid_set_property(ctx->pid, GF_PROP_PID_DOWNLOAD_SESSION, &PROP_POINTER( (void*)ctx->sess ) );
367 			}
368 
369 			/*in test mode don't expose http headers (they contain date/version/etc)*/
370 			if (! gf_sys_is_test_mode()) {
371 				idx = 0;
372 				while (gf_dm_sess_enum_headers(ctx->sess, &idx, &hname, &hval) == GF_OK) {
373 					gf_filter_pid_set_property_dyn(ctx->pid, (char *) hname, & PROP_STRING(hval));
374 				}
375 			}
376 		}
377 
378 		gf_filter_pid_set_info(ctx->pid, GF_PROP_PID_DOWN_RATE, &PROP_UINT(8*bytes_per_sec) );
379 		if (ctx->range.num && ctx->file_size) {
380 			gf_filter_pid_set_info(ctx->pid, GF_PROP_PID_DOWN_BYTES, &PROP_LONGUINT(bytes_done + ctx->range.num) );
381 			gf_filter_pid_set_info(ctx->pid, GF_PROP_PID_DOWN_SIZE, &PROP_LONGUINT(ctx->file_size) );
382 		} else {
383 			gf_filter_pid_set_info(ctx->pid, GF_PROP_PID_DOWN_BYTES, &PROP_LONGUINT(bytes_done) );
384 			gf_filter_pid_set_info(ctx->pid, GF_PROP_PID_DOWN_SIZE, &PROP_LONGUINT(ctx->file_size ? ctx->file_size : bytes_done) );
385 		}
386 	}
387 
388 	byte_offset = ctx->nb_read;
389 
390 	ctx->nb_read += nb_read;
391 	if (ctx->file_size && (ctx->nb_read==ctx->file_size)) {
392 		ctx->is_end = GF_TRUE;
393 	} else if (e==GF_EOS) {
394 		ctx->is_end = GF_TRUE;
395 	}
396 
397 	pck = gf_filter_pck_new_shared(ctx->pid, ctx->block, nb_read, httpin_rel_pck);
398 	if (!pck) return GF_OK;
399 
400 	gf_filter_pck_set_cts(pck, 0);
401 
402 	gf_filter_pck_set_framing(pck, is_start, ctx->is_end);
403 	gf_filter_pck_set_sap(pck, GF_FILTER_SAP_1);
404 	gf_filter_pck_set_byte_offset(pck, byte_offset);
405 
406 	//mark packet out BEFORE sending, since the call to send() may destroy the packet if cloned
407 	ctx->pck_out = GF_TRUE;
408 	gf_filter_pck_send(pck);
409 
410 	if (ctx->file_size && gf_filter_reporting_enabled(filter)) {
411 		char szStatus[1024], *szSrc;
412 		szSrc = gf_file_basename(ctx->src);
413 
414 		sprintf(szStatus, "%s: % 16"LLD_SUF" /% 16"LLD_SUF" (%02.02f) % 8d kbps", szSrc, (s64) bytes_done, (s64) ctx->file_size, ((Double)bytes_done*100.0)/ctx->file_size, bytes_per_sec*8/1000);
415 		gf_filter_update_status(filter, (u32) (bytes_done*10000/ctx->file_size), szStatus);
416 	}
417 
418 	if (ctx->is_end) {
419 		const char *cached = gf_dm_sess_get_cache_name(ctx->sess);
420 		if (cached)
421 			gf_filter_pid_set_property(ctx->pid, GF_PROP_PID_FILE_CACHED, &PROP_BOOL(GF_TRUE) );
422 
423 		gf_filter_pid_set_eos(ctx->pid);
424 		return GF_EOS;
425 	}
426 
427 	return ctx->pck_out ? GF_EOS : GF_OK;
428 }
429 
430 
431 
432 #define OFFS(_n)	#_n, offsetof(GF_HTTPInCtx, _n)
433 
434 static const GF_FilterArgs HTTPInArgs[] =
435 {
436 	{ OFFS(src), "location of source content", GF_PROP_NAME, NULL, NULL, 0},
437 	{ OFFS(block_size), "block size used to read file", GF_PROP_UINT, "100000", NULL, GF_FS_ARG_HINT_ADVANCED},
438 	{ OFFS(cache), "set cache mode\n"
439 	"- disk: cache to disk,  discard once session is no longer used\n"
440 	"- disk: cache to disk and keep\n"
441 	"- mem: stores to memory, discard once session is no longer used\n"
442 	"- none: no cache", GF_PROP_UINT, "disk", "disk|keep|mem|none", GF_FS_ARG_HINT_ADVANCED},
443 	{ OFFS(range), "set byte range, as fraction", GF_PROP_FRACTION64, "0-0", NULL, 0},
444 	{ OFFS(ext), "override file extension", GF_PROP_NAME, NULL, NULL, 0},
445 	{ OFFS(mime), "set file mime type", GF_PROP_NAME, NULL, NULL, 0},
446 	{0}
447 };
448 
449 static const GF_FilterCapability HTTPInCaps[] =
450 {
451 	CAP_UINT(GF_CAPS_OUTPUT,  GF_PROP_PID_STREAM_TYPE, GF_STREAM_FILE),
452 };
453 
454 GF_FilterRegister HTTPInRegister = {
455 	.name = "httpin",
456 	GF_FS_SET_DESCRIPTION("HTTP input")
457 	GF_FS_SET_HELP("This filter dispatch raw blocks from a remote HTTP resource into a filter chain.\n"
458 	"Block size can be adjusted using [-block_size](), and disk caching policies can be adjusted.\n"
459 	"Content format can be forced through [-mime]() and file extension can be changed through [-ext]().\n"
460 	"Note: Unless disabled at session level (see [-no-probe](CORE) ), file extensions are usually ignored and format probing is done on the first data block.")
461 	.private_size = sizeof(GF_HTTPInCtx),
462 	.flags = GF_FS_REG_BLOCKING,
463 	.args = HTTPInArgs,
464 	SETCAPS(HTTPInCaps),
465 	.initialize = httpin_initialize,
466 	.finalize = httpin_finalize,
467 	.process = httpin_process,
468 	.process_event = httpin_process_event,
469 	.probe_url = httpin_probe_url
470 };
471 
472 
httpin_register(GF_FilterSession * session)473 const GF_FilterRegister *httpin_register(GF_FilterSession *session)
474 {
475 	return &HTTPInRegister;
476 }
477 
478