1 /*
2 * This file is part of mpv.
3 *
4 * mpv is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
8 *
9 * mpv is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU Lesser General Public License for more details.
13 *
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with mpv. If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 #include <float.h>
19 #include <stdio.h>
20 #include <stdlib.h>
21 #include <string.h>
22 #include <assert.h>
23 #include <unistd.h>
24 #include <limits.h>
25 #include <pthread.h>
26 #include <stdint.h>
27
28 #include <math.h>
29
30 #include <sys/types.h>
31 #include <sys/stat.h>
32
33 #include "cache.h"
34 #include "config.h"
35 #include "options/m_config.h"
36 #include "options/m_option.h"
37 #include "mpv_talloc.h"
38 #include "common/av_common.h"
39 #include "common/msg.h"
40 #include "common/global.h"
41 #include "common/recorder.h"
42 #include "common/stats.h"
43 #include "misc/charset_conv.h"
44 #include "misc/thread_tools.h"
45 #include "osdep/atomic.h"
46 #include "osdep/timer.h"
47 #include "osdep/threads.h"
48
49 #include "stream/stream.h"
50 #include "demux.h"
51 #include "timeline.h"
52 #include "stheader.h"
53 #include "cue.h"
54
55 // Demuxer list
56 extern const struct demuxer_desc demuxer_desc_edl;
57 extern const struct demuxer_desc demuxer_desc_cue;
58 extern const demuxer_desc_t demuxer_desc_rawaudio;
59 extern const demuxer_desc_t demuxer_desc_rawvideo;
60 extern const demuxer_desc_t demuxer_desc_mf;
61 extern const demuxer_desc_t demuxer_desc_matroska;
62 extern const demuxer_desc_t demuxer_desc_lavf;
63 extern const demuxer_desc_t demuxer_desc_playlist;
64 extern const demuxer_desc_t demuxer_desc_disc;
65 extern const demuxer_desc_t demuxer_desc_rar;
66 extern const demuxer_desc_t demuxer_desc_libarchive;
67 extern const demuxer_desc_t demuxer_desc_null;
68 extern const demuxer_desc_t demuxer_desc_timeline;
69
70 static const demuxer_desc_t *const demuxer_list[] = {
71 &demuxer_desc_disc,
72 &demuxer_desc_edl,
73 &demuxer_desc_cue,
74 &demuxer_desc_rawaudio,
75 &demuxer_desc_rawvideo,
76 &demuxer_desc_matroska,
77 #if HAVE_LIBARCHIVE
78 &demuxer_desc_libarchive,
79 #endif
80 &demuxer_desc_lavf,
81 &demuxer_desc_mf,
82 &demuxer_desc_playlist,
83 &demuxer_desc_null,
84 NULL
85 };
86
87 struct demux_opts {
88 int enable_cache;
89 int disk_cache;
90 int64_t max_bytes;
91 int64_t max_bytes_bw;
92 int donate_fw;
93 double min_secs;
94 int force_seekable;
95 double min_secs_cache;
96 int access_references;
97 int seekable_cache;
98 int create_ccs;
99 char *record_file;
100 int video_back_preroll;
101 int audio_back_preroll;
102 int back_batch[STREAM_TYPE_COUNT];
103 double back_seek_size;
104 char *meta_cp;
105 int force_retry_eof;
106 };
107
108 #define OPT_BASE_STRUCT struct demux_opts
109
110 static bool get_demux_sub_opts(int index, const struct m_sub_options **sub);
111
112 const struct m_sub_options demux_conf = {
113 .opts = (const struct m_option[]){
114 {"cache", OPT_CHOICE(enable_cache,
115 {"no", 0}, {"auto", -1}, {"yes", 1})},
116 {"cache-on-disk", OPT_FLAG(disk_cache)},
117 {"demuxer-readahead-secs", OPT_DOUBLE(min_secs), M_RANGE(0, DBL_MAX)},
118 {"demuxer-max-bytes", OPT_BYTE_SIZE(max_bytes),
119 M_RANGE(0, M_MAX_MEM_BYTES)},
120 {"demuxer-max-back-bytes", OPT_BYTE_SIZE(max_bytes_bw),
121 M_RANGE(0, M_MAX_MEM_BYTES)},
122 {"demuxer-donate-buffer", OPT_FLAG(donate_fw)},
123 {"force-seekable", OPT_FLAG(force_seekable)},
124 {"cache-secs", OPT_DOUBLE(min_secs_cache), M_RANGE(0, DBL_MAX)},
125 {"access-references", OPT_FLAG(access_references)},
126 {"demuxer-seekable-cache", OPT_CHOICE(seekable_cache,
127 {"auto", -1}, {"no", 0}, {"yes", 1})},
128 {"sub-create-cc-track", OPT_FLAG(create_ccs)},
129 {"stream-record", OPT_STRING(record_file)},
130 {"video-backward-overlap", OPT_CHOICE(video_back_preroll, {"auto", -1}),
131 M_RANGE(0, 1024)},
132 {"audio-backward-overlap", OPT_CHOICE(audio_back_preroll, {"auto", -1}),
133 M_RANGE(0, 1024)},
134 {"video-backward-batch", OPT_INT(back_batch[STREAM_VIDEO]),
135 M_RANGE(0, 1024)},
136 {"audio-backward-batch", OPT_INT(back_batch[STREAM_AUDIO]),
137 M_RANGE(0, 1024)},
138 {"demuxer-backward-playback-step", OPT_DOUBLE(back_seek_size),
139 M_RANGE(0, DBL_MAX)},
140 {"metadata-codepage", OPT_STRING(meta_cp)},
141 {"demuxer-force-retry-on-eof", OPT_FLAG(force_retry_eof),
142 .deprecation_message = "temporary debug option, no replacement"},
143 {0}
144 },
145 .size = sizeof(struct demux_opts),
146 .defaults = &(const struct demux_opts){
147 .enable_cache = -1, // auto
148 .max_bytes = 150 * 1024 * 1024,
149 .max_bytes_bw = 50 * 1024 * 1024,
150 .donate_fw = 1,
151 .min_secs = 1.0,
152 .min_secs_cache = 1000.0 * 60 * 60,
153 .seekable_cache = -1,
154 .access_references = 1,
155 .video_back_preroll = -1,
156 .audio_back_preroll = -1,
157 .back_seek_size = 60,
158 .back_batch = {
159 [STREAM_VIDEO] = 1,
160 [STREAM_AUDIO] = 10,
161 },
162 .meta_cp = "utf-8",
163 },
164 .get_sub_options = get_demux_sub_opts,
165 };
166
167 struct demux_internal {
168 struct mp_log *log;
169 struct mpv_global *global;
170 struct stats_ctx *stats;
171
172 bool can_cache; // not a slave demuxer; caching makes sense
173 bool can_record; // stream recording is allowed
174
175 // The demuxer runs potentially in another thread, so we keep two demuxer
176 // structs; the real demuxer can access the shadow struct only.
177 struct demuxer *d_thread; // accessed by demuxer impl. (producer)
178 struct demuxer *d_user; // accessed by player (consumer)
179
180 // The lock protects the packet queues (struct demux_stream),
181 // and the fields below.
182 pthread_mutex_t lock;
183 pthread_cond_t wakeup;
184 pthread_t thread;
185
186 // -- All the following fields are protected by lock.
187
188 struct demux_opts *opts;
189 struct m_config_cache *opts_cache;
190
191 bool thread_terminate;
192 bool threading;
193 bool shutdown_async;
194 void (*wakeup_cb)(void *ctx);
195 void *wakeup_cb_ctx;
196
197 struct sh_stream **streams;
198 int num_streams;
199
200 char *meta_charset;
201
202 // If non-NULL, a stream which is used for global (timed) metadata. It will
203 // be an arbitrary stream, which hopefully will happen to work.
204 struct sh_stream *metadata_stream;
205
206 int events;
207
208 struct demux_cache *cache;
209
210 bool warned_queue_overflow;
211 bool eof; // whether we're in EOF state
212 double min_secs;
213 size_t max_bytes;
214 size_t max_bytes_bw;
215 bool seekable_cache;
216 bool using_network_cache_opts;
217 char *record_filename;
218
219 // Whether the demuxer thread should prefetch packets. This is set to false
220 // if EOF was reached or the demuxer cache is full. This is also important
221 // in the initial state: the decoder thread needs to select streams before
222 // the first packet is read, so this is set to true by packet reading only.
223 // Reset to false again on EOF or if prefetching is done.
224 bool reading;
225
226 // Set if we just performed a seek, without reading packets yet. Used to
227 // avoid a redundant initial seek after enabling streams. We could just
228 // allow it, but to avoid buggy seeking affecting normal playback, we don't.
229 bool after_seek;
230 // Set in addition to after_seek if we think we seeked to the start of the
231 // file (or if the demuxer was just opened).
232 bool after_seek_to_start;
233
234 // Demuxing backwards. Since demuxer implementations don't support this
235 // directly, it is emulated by seeking backwards for every packet run. Also,
236 // packets between keyframes are demuxed forwards (you can't decode that
237 // stuff otherwise), which adds complexity on top of it.
238 bool back_demuxing;
239
240 // For backward demuxing:
241 bool need_back_seek; // back-step seek needs to be triggered
242 bool back_any_need_recheck; // at least 1 ds->back_need_recheck set
243
244 bool tracks_switched; // thread needs to inform demuxer of this
245
246 bool seeking; // there's a seek queued
247 int seek_flags; // flags for next seek (if seeking==true)
248 double seek_pts;
249
250 // (fields for debugging)
251 double seeking_in_progress; // low level seek state
252 int low_level_seeks; // number of started low level seeks
253 double demux_ts; // last demuxed DTS or PTS
254
255 double ts_offset; // timestamp offset to apply to everything
256
257 // (sorted by least recent use: index 0 is least recently used)
258 struct demux_cached_range **ranges;
259 int num_ranges;
260
261 size_t total_bytes; // total sum of packet data buffered
262 // Range from which decoder is reading, and to which demuxer is appending.
263 // This is normally never NULL. This is always ranges[num_ranges - 1].
264 // This is can be NULL during initialization or deinitialization.
265 struct demux_cached_range *current_range;
266
267 double highest_av_pts; // highest non-subtitle PTS seen - for duration
268
269 bool blocked;
270
271 // Transient state.
272 double duration;
273 // Cached state.
274 int64_t stream_size;
275 int64_t last_speed_query;
276 double speed_query_prev_sample;
277 uint64_t bytes_per_second;
278 int64_t next_cache_update;
279
280 // demux user state (user thread, somewhat similar to reader/decoder state)
281 double last_playback_pts; // last playback_pts from demux_update()
282 bool force_metadata_update;
283 int cached_metadata_index; // speed up repeated lookups
284
285 struct mp_recorder *dumper;
286 int dumper_status;
287
288 bool owns_stream;
289
290 // -- Access from demuxer thread only
291 bool enable_recording;
292 struct mp_recorder *recorder;
293 int64_t slave_unbuffered_read_bytes; // value repoted from demuxer impl.
294 int64_t hack_unbuffered_read_bytes; // for demux_get_bytes_read_hack()
295 int64_t cache_unbuffered_read_bytes; // for demux_reader_state.bytes_per_second
296 int64_t byte_level_seeks; // for demux_reader_state.byte_level_seeks
297 };
298
299 struct timed_metadata {
300 double pts;
301 struct mp_tags *tags;
302 bool from_stream;
303 };
304
305 // A continuous range of cached packets for all enabled streams.
306 // (One demux_queue for each known stream.)
307 struct demux_cached_range {
308 // streams[] is indexed by demux_stream->index
309 struct demux_queue **streams;
310 int num_streams;
311
312 // Computed from the stream queue's values. These fields (unlike as with
313 // demux_queue) are always either NOPTS, or fully valid.
314 double seek_start, seek_end;
315
316 bool is_bof; // set if the file begins with this range
317 bool is_eof; // set if the file ends with this range
318
319 struct timed_metadata **metadata;
320 int num_metadata;
321 };
322
323 #define QUEUE_INDEX_SIZE_MASK(queue) ((queue)->index_size - 1)
324
325 // Access the idx-th entry in the given demux_queue.
326 // Requirement: idx >= 0 && idx < queue->num_index
327 #define QUEUE_INDEX_ENTRY(queue, idx) \
328 ((queue)->index[((queue)->index0 + (idx)) & QUEUE_INDEX_SIZE_MASK(queue)])
329
330 // Don't index packets whose timestamps that are within the last index entry by
331 // this amount of time (it's better to seek them manually).
332 #define INDEX_STEP_SIZE 1.0
333
334 struct index_entry {
335 double pts;
336 struct demux_packet *pkt;
337 };
338
339 // A continuous list of cached packets for a single stream/range. There is one
340 // for each stream and range. Also contains some state for use during demuxing
341 // (keeping it across seeks makes it easier to resume demuxing).
342 struct demux_queue {
343 struct demux_stream *ds;
344 struct demux_cached_range *range;
345
346 struct demux_packet *head;
347 struct demux_packet *tail;
348
349 uint64_t tail_cum_pos; // cumulative size including tail packet
350
351 bool correct_dts; // packet DTS is strictly monotonically increasing
352 bool correct_pos; // packet pos is strictly monotonically increasing
353 int64_t last_pos; // for determining correct_pos
354 int64_t last_pos_fixup; // for filling in unset dp->pos values
355 double last_dts; // for determining correct_dts
356 double last_ts; // timestamp of the last packet added to queue
357
358 // for incrementally determining seek PTS range
359 struct demux_packet *keyframe_latest;
360 struct demux_packet *keyframe_first; // cached value of first KF packet
361
362 // incrementally maintained seek range, possibly invalid
363 double seek_start, seek_end;
364 double last_pruned; // timestamp of last pruned keyframe
365
366 bool is_bof; // started demuxing at beginning of file
367 bool is_eof; // received true EOF here
368
369 // Complete index, though it may skip some entries to reduce density.
370 struct index_entry *index; // ring buffer
371 size_t index_size; // size of index[] (0 or a power of 2)
372 size_t index0; // first index entry
373 size_t num_index; // number of index entries (wraps on index_size)
374 };
375
376 struct demux_stream {
377 struct demux_internal *in;
378 struct sh_stream *sh; // ds->sh->ds == ds
379 enum stream_type type; // equals to sh->type
380 int index; // equals to sh->index
381 // --- all fields are protected by in->lock
382
383 void (*wakeup_cb)(void *ctx);
384 void *wakeup_cb_ctx;
385
386 // demuxer state
387 bool selected; // user wants packets from this stream
388 bool eager; // try to keep at least 1 packet queued
389 // if false, this stream is disabled, or passively
390 // read (like subtitles)
391 bool still_image; // stream has still video images
392 bool refreshing; // finding old position after track switches
393 bool eof; // end of demuxed stream? (true if no more packets)
394
395 bool global_correct_dts;// all observed so far
396 bool global_correct_pos;
397
398 // current queue - used both for reading and demuxing (this is never NULL)
399 struct demux_queue *queue;
400
401 // reader (decoder) state (bitrate calculations are part of it because we
402 // want to return the bitrate closest to the "current position")
403 double base_ts; // timestamp of the last packet returned to decoder
404 double last_br_ts; // timestamp of last packet bitrate was calculated
405 size_t last_br_bytes; // summed packet sizes since last bitrate calculation
406 double bitrate;
407 struct demux_packet *reader_head; // points at current decoder position
408 bool skip_to_keyframe;
409 bool attached_picture_added;
410 bool need_wakeup; // call wakeup_cb on next reader_head state change
411 double force_read_until;// eager=false streams (subs): force read-ahead
412
413 // For demux_internal.dumper. Currently, this is used only temporarily
414 // during blocking dumping.
415 struct demux_packet *dump_pos;
416
417 // for refresh seeks: pos/dts of last packet returned to reader
418 int64_t last_ret_pos;
419 double last_ret_dts;
420
421 // Backwards demuxing.
422 bool back_need_recheck; // flag for incremental find_backward_restart_pos work
423 // pos/dts of the previous keyframe packet returned; always valid if back-
424 // demuxing is enabled, and back_restart_eof/back_restart_next are false.
425 int64_t back_restart_pos;
426 double back_restart_dts;
427 bool back_restart_eof; // restart position is at EOF; overrides pos/dts
428 bool back_restart_next; // restart before next keyframe; overrides above
429 bool back_restarting; // searching keyframe before restart pos
430 // Current PTS lower bound for back demuxing.
431 double back_seek_pos;
432 // pos/dts of the packet to resume demuxing from when another stream caused
433 // a seek backward to get more packets. reader_head will be reset to this
434 // packet as soon as it's encountered again.
435 int64_t back_resume_pos;
436 double back_resume_dts;
437 bool back_resuming; // resuming mode (above fields are valid/used)
438 // Set to true if the first packet (keyframe) of a range was returned.
439 bool back_range_started;
440 // Number of KF packets at start of range yet to return. -1 is used for BOF.
441 int back_range_count;
442 // Number of KF packets yet to return that are marked as preroll.
443 int back_range_preroll;
444 // Static packet preroll count.
445 int back_preroll;
446
447 // for closed captions (demuxer_feed_caption)
448 struct sh_stream *cc;
449 bool ignore_eof; // ignore stream in underrun detection
450 };
451
452 static void switch_to_fresh_cache_range(struct demux_internal *in);
453 static void demuxer_sort_chapters(demuxer_t *demuxer);
454 static void *demux_thread(void *pctx);
455 static void update_cache(struct demux_internal *in);
456 static void add_packet_locked(struct sh_stream *stream, demux_packet_t *dp);
457 static struct demux_packet *advance_reader_head(struct demux_stream *ds);
458 static bool queue_seek(struct demux_internal *in, double seek_pts, int flags,
459 bool clear_back_state);
460 static struct demux_packet *compute_keyframe_times(struct demux_packet *pkt,
461 double *out_kf_min,
462 double *out_kf_max);
463 static void find_backward_restart_pos(struct demux_stream *ds);
464 static struct demux_packet *find_seek_target(struct demux_queue *queue,
465 double pts, int flags);
466 static void prune_old_packets(struct demux_internal *in);
467 static void dumper_close(struct demux_internal *in);
468 static void demux_convert_tags_charset(struct demuxer *demuxer);
469
get_foward_buffered_bytes(struct demux_stream * ds)470 static uint64_t get_foward_buffered_bytes(struct demux_stream *ds)
471 {
472 if (!ds->reader_head)
473 return 0;
474 return ds->queue->tail_cum_pos - ds->reader_head->cum_pos;
475 }
476
477 #if 0
478 // very expensive check for redundant cached queue state
479 static void check_queue_consistency(struct demux_internal *in)
480 {
481 uint64_t total_bytes = 0;
482
483 assert(in->current_range && in->num_ranges > 0);
484 assert(in->current_range == in->ranges[in->num_ranges - 1]);
485
486 for (int n = 0; n < in->num_ranges; n++) {
487 struct demux_cached_range *range = in->ranges[n];
488 int range_num_packets = 0;
489
490 assert(range->num_streams == in->num_streams);
491
492 for (int i = 0; i < range->num_streams; i++) {
493 struct demux_queue *queue = range->streams[i];
494
495 assert(queue->range == range);
496
497 size_t fw_bytes = 0;
498 bool is_forward = false;
499 bool kf_found = false;
500 bool kf1_found = false;
501 size_t next_index = 0;
502 uint64_t queue_total_bytes = 0;
503 for (struct demux_packet *dp = queue->head; dp; dp = dp->next) {
504 is_forward |= dp == queue->ds->reader_head;
505 kf_found |= dp == queue->keyframe_latest;
506 kf1_found |= dp == queue->keyframe_first;
507
508 size_t bytes = demux_packet_estimate_total_size(dp);
509 total_bytes += bytes;
510 queue_total_bytes += bytes;
511 if (is_forward) {
512 fw_bytes += bytes;
513 assert(range == in->current_range);
514 assert(queue->ds->queue == queue);
515 }
516 range_num_packets += 1;
517
518 if (!dp->next)
519 assert(queue->tail == dp);
520
521 if (next_index < queue->num_index &&
522 QUEUE_INDEX_ENTRY(queue, next_index).pkt == dp)
523 next_index += 1;
524 }
525 if (!queue->head)
526 assert(!queue->tail);
527 assert(next_index == queue->num_index);
528
529 uint64_t queue_total_bytes2 = 0;
530 if (queue->head)
531 queue_total_bytes2 = queue->tail_cum_pos - queue->head->cum_pos;
532
533 assert(queue_total_bytes == queue_total_bytes2);
534
535 // If the queue is currently used...
536 if (queue->ds->queue == queue) {
537 // ...reader_head and others must be in the queue.
538 assert(is_forward == !!queue->ds->reader_head);
539 assert(kf_found == !!queue->keyframe_latest);
540 uint64_t fw_bytes2 = get_foward_buffered_bytes(queue->ds);
541 assert(fw_bytes == fw_bytes2);
542 }
543
544 assert(kf1_found == !!queue->keyframe_first);
545
546 if (range != in->current_range) {
547 assert(fw_bytes == 0);
548 }
549
550 if (queue->keyframe_latest)
551 assert(queue->keyframe_latest->keyframe);
552
553 total_bytes += queue->index_size * sizeof(struct index_entry);
554 }
555
556 // Invariant needed by pruning; violation has worse effects than just
557 // e.g. broken seeking due to incorrect seek ranges.
558 if (range->seek_start != MP_NOPTS_VALUE)
559 assert(range_num_packets > 0);
560 }
561
562 assert(in->total_bytes == total_bytes);
563 }
564 #endif
565
566 // (this doesn't do most required things for a switch, like updating ds->queue)
set_current_range(struct demux_internal * in,struct demux_cached_range * range)567 static void set_current_range(struct demux_internal *in,
568 struct demux_cached_range *range)
569 {
570 in->current_range = range;
571
572 // Move to in->ranges[in->num_ranges-1] (for LRU sorting/invariant)
573 for (int n = 0; n < in->num_ranges; n++) {
574 if (in->ranges[n] == range) {
575 MP_TARRAY_REMOVE_AT(in->ranges, in->num_ranges, n);
576 break;
577 }
578 }
579 MP_TARRAY_APPEND(in, in->ranges, in->num_ranges, range);
580 }
581
prune_metadata(struct demux_cached_range * range)582 static void prune_metadata(struct demux_cached_range *range)
583 {
584 int first_needed = 0;
585
586 if (range->seek_start == MP_NOPTS_VALUE) {
587 first_needed = range->num_metadata;
588 } else {
589 for (int n = 0; n < range->num_metadata ; n++) {
590 if (range->metadata[n]->pts > range->seek_start)
591 break;
592 first_needed = n;
593 }
594 }
595
596 // Always preserve the last entry.
597 first_needed = MPMIN(first_needed, range->num_metadata - 1);
598
599 // (Could make this significantly more efficient for large first_needed,
600 // however that might be very rare and even then it might not matter.)
601 for (int n = 0; n < first_needed; n++) {
602 talloc_free(range->metadata[0]);
603 MP_TARRAY_REMOVE_AT(range->metadata, range->num_metadata, 0);
604 }
605 }
606
607 // Refresh range->seek_start/end. Idempotent.
update_seek_ranges(struct demux_cached_range * range)608 static void update_seek_ranges(struct demux_cached_range *range)
609 {
610 range->seek_start = range->seek_end = MP_NOPTS_VALUE;
611 range->is_bof = true;
612 range->is_eof = true;
613
614 double min_start_pts = MP_NOPTS_VALUE;
615 double max_end_pts = MP_NOPTS_VALUE;
616
617 for (int n = 0; n < range->num_streams; n++) {
618 struct demux_queue *queue = range->streams[n];
619
620 if (queue->ds->selected && queue->ds->eager) {
621 if (queue->is_bof) {
622 min_start_pts = MP_PTS_MIN(min_start_pts, queue->seek_start);
623 } else {
624 range->seek_start =
625 MP_PTS_MAX(range->seek_start, queue->seek_start);
626 }
627
628 if (queue->is_eof) {
629 max_end_pts = MP_PTS_MAX(max_end_pts, queue->seek_end);
630 } else {
631 range->seek_end = MP_PTS_MIN(range->seek_end, queue->seek_end);
632 }
633
634 range->is_eof &= queue->is_eof;
635 range->is_bof &= queue->is_bof;
636
637 bool empty = queue->is_eof && !queue->head;
638 if (queue->seek_start >= queue->seek_end && !empty &&
639 !(queue->seek_start == queue->seek_end &&
640 queue->seek_start != MP_NOPTS_VALUE))
641 goto broken;
642 }
643 }
644
645 if (range->is_eof)
646 range->seek_end = max_end_pts;
647 if (range->is_bof)
648 range->seek_start = min_start_pts;
649
650 // Sparse (subtitle) stream behavior is not very clearly defined, but
651 // usually we don't want it to restrict the range of other streams. For
652 // example, if there are subtitle packets at position 5 and 10 seconds, and
653 // the demuxer demuxed the other streams until position 7 seconds, the seek
654 // range end position is 7.
655 // Assume that reading a non-sparse (audio/video) packet gets all sparse
656 // packets that are needed before that non-sparse packet.
657 // This is incorrect in any of these cases:
658 // - sparse streams only (it's unknown how to determine an accurate range)
659 // - if sparse streams have non-keyframe packets (we set queue->last_pruned
660 // to the start of the pruned keyframe range - we'd need the end or so)
661 // We also assume that ds->eager equals to a stream not being sparse
662 // (usually true, except if only sparse streams are selected).
663 // We also rely on the fact that the demuxer position will always be ahead
664 // of the seek_end for audio/video, because they need to prefetch at least
665 // 1 packet to detect the end of a keyframe range. This means that there's
666 // a relatively high guarantee to have all sparse (subtitle) packets within
667 // the seekable range.
668 // As a consequence, the code _never_ checks queue->seek_end for a sparse
669 // queue, as the end of it is implied by the highest PTS of a non-sparse
670 // stream (i.e. the latest demuxer position).
671 // On the other hand, if a sparse packet was pruned, and that packet has
672 // a higher PTS than seek_start for non-sparse queues, that packet is
673 // missing. So the range's seek_start needs to be adjusted accordingly.
674 for (int n = 0; n < range->num_streams; n++) {
675 struct demux_queue *queue = range->streams[n];
676 if (queue->ds->selected && !queue->ds->eager &&
677 queue->last_pruned != MP_NOPTS_VALUE &&
678 range->seek_start != MP_NOPTS_VALUE)
679 {
680 // (last_pruned is _exclusive_ to the seekable range, so add a small
681 // value to exclude it from the valid range.)
682 range->seek_start =
683 MP_PTS_MAX(range->seek_start, queue->last_pruned + 0.1);
684 }
685 }
686
687 if (range->seek_start >= range->seek_end)
688 goto broken;
689
690 prune_metadata(range);
691 return;
692
693 broken:
694 range->seek_start = range->seek_end = MP_NOPTS_VALUE;
695 prune_metadata(range);
696 }
697
698 // Remove queue->head from the queue.
remove_head_packet(struct demux_queue * queue)699 static void remove_head_packet(struct demux_queue *queue)
700 {
701 struct demux_packet *dp = queue->head;
702
703 assert(queue->ds->reader_head != dp);
704 if (queue->keyframe_first == dp)
705 queue->keyframe_first = NULL;
706 if (queue->keyframe_latest == dp)
707 queue->keyframe_latest = NULL;
708 queue->is_bof = false;
709
710 uint64_t end_pos = dp->next ? dp->next->cum_pos : queue->tail_cum_pos;
711 queue->ds->in->total_bytes -= end_pos - dp->cum_pos;
712
713 if (queue->num_index && queue->index[queue->index0].pkt == dp) {
714 queue->index0 = (queue->index0 + 1) & QUEUE_INDEX_SIZE_MASK(queue);
715 queue->num_index -= 1;
716 }
717
718 queue->head = dp->next;
719 if (!queue->head)
720 queue->tail = NULL;
721
722 talloc_free(dp);
723 }
724
free_index(struct demux_queue * queue)725 static void free_index(struct demux_queue *queue)
726 {
727 struct demux_stream *ds = queue->ds;
728 struct demux_internal *in = ds->in;
729
730 in->total_bytes -= queue->index_size * sizeof(queue->index[0]);
731 queue->index_size = 0;
732 queue->index0 = 0;
733 queue->num_index = 0;
734 TA_FREEP(&queue->index);
735 }
736
clear_queue(struct demux_queue * queue)737 static void clear_queue(struct demux_queue *queue)
738 {
739 struct demux_stream *ds = queue->ds;
740 struct demux_internal *in = ds->in;
741
742 if (queue->head)
743 in->total_bytes -= queue->tail_cum_pos - queue->head->cum_pos;
744
745 free_index(queue);
746
747 struct demux_packet *dp = queue->head;
748 while (dp) {
749 struct demux_packet *dn = dp->next;
750 assert(ds->reader_head != dp);
751 talloc_free(dp);
752 dp = dn;
753 }
754 queue->head = queue->tail = NULL;
755 queue->keyframe_first = NULL;
756 queue->keyframe_latest = NULL;
757 queue->seek_start = queue->seek_end = queue->last_pruned = MP_NOPTS_VALUE;
758
759 queue->correct_dts = queue->correct_pos = true;
760 queue->last_pos = -1;
761 queue->last_ts = queue->last_dts = MP_NOPTS_VALUE;
762 queue->last_pos_fixup = -1;
763
764 queue->is_eof = false;
765 queue->is_bof = false;
766 }
767
clear_cached_range(struct demux_internal * in,struct demux_cached_range * range)768 static void clear_cached_range(struct demux_internal *in,
769 struct demux_cached_range *range)
770 {
771 for (int n = 0; n < range->num_streams; n++)
772 clear_queue(range->streams[n]);
773
774 for (int n = 0; n < range->num_metadata; n++)
775 talloc_free(range->metadata[n]);
776 range->num_metadata = 0;
777
778 update_seek_ranges(range);
779 }
780
781 // Remove ranges with no data (except in->current_range). Also remove excessive
782 // ranges.
free_empty_cached_ranges(struct demux_internal * in)783 static void free_empty_cached_ranges(struct demux_internal *in)
784 {
785 while (1) {
786 struct demux_cached_range *worst = NULL;
787
788 int end = in->num_ranges - 1;
789
790 // (Not set during early init or late destruction.)
791 if (in->current_range) {
792 assert(in->current_range && in->num_ranges > 0);
793 assert(in->current_range == in->ranges[in->num_ranges - 1]);
794 end -= 1;
795 }
796
797 for (int n = end; n >= 0; n--) {
798 struct demux_cached_range *range = in->ranges[n];
799 if (range->seek_start == MP_NOPTS_VALUE || !in->seekable_cache) {
800 clear_cached_range(in, range);
801 MP_TARRAY_REMOVE_AT(in->ranges, in->num_ranges, n);
802 for (int i = 0; i < range->num_streams; i++)
803 talloc_free(range->streams[i]);
804 talloc_free(range);
805 } else {
806 if (!worst || (range->seek_end - range->seek_start <
807 worst->seek_end - worst->seek_start))
808 worst = range;
809 }
810 }
811
812 if (in->num_ranges <= MAX_SEEK_RANGES || !worst)
813 break;
814
815 clear_cached_range(in, worst);
816 }
817 }
818
ds_clear_reader_queue_state(struct demux_stream * ds)819 static void ds_clear_reader_queue_state(struct demux_stream *ds)
820 {
821 ds->reader_head = NULL;
822 ds->eof = false;
823 ds->need_wakeup = true;
824 }
825
ds_clear_reader_state(struct demux_stream * ds,bool clear_back_state)826 static void ds_clear_reader_state(struct demux_stream *ds,
827 bool clear_back_state)
828 {
829 ds_clear_reader_queue_state(ds);
830
831 ds->base_ts = ds->last_br_ts = MP_NOPTS_VALUE;
832 ds->last_br_bytes = 0;
833 ds->bitrate = -1;
834 ds->skip_to_keyframe = false;
835 ds->attached_picture_added = false;
836 ds->last_ret_pos = -1;
837 ds->last_ret_dts = MP_NOPTS_VALUE;
838 ds->force_read_until = MP_NOPTS_VALUE;
839
840 if (clear_back_state) {
841 ds->back_restart_pos = -1;
842 ds->back_restart_dts = MP_NOPTS_VALUE;
843 ds->back_restart_eof = false;
844 ds->back_restart_next = ds->in->back_demuxing;
845 ds->back_restarting = ds->in->back_demuxing && ds->eager;
846 ds->back_seek_pos = MP_NOPTS_VALUE;
847 ds->back_resume_pos = -1;
848 ds->back_resume_dts = MP_NOPTS_VALUE;
849 ds->back_resuming = false;
850 ds->back_range_started = false;
851 ds->back_range_count = 0;
852 ds->back_range_preroll = 0;
853 }
854 }
855
856 // called locked, from user thread only
clear_reader_state(struct demux_internal * in,bool clear_back_state)857 static void clear_reader_state(struct demux_internal *in,
858 bool clear_back_state)
859 {
860 for (int n = 0; n < in->num_streams; n++)
861 ds_clear_reader_state(in->streams[n]->ds, clear_back_state);
862 in->warned_queue_overflow = false;
863 in->d_user->filepos = -1; // implicitly synchronized
864 in->blocked = false;
865 in->need_back_seek = false;
866 }
867
868 // Call if the observed reader state on this stream somehow changes. The wakeup
869 // is skipped if the reader successfully read a packet, because that means we
870 // expect it to come back and ask for more.
wakeup_ds(struct demux_stream * ds)871 static void wakeup_ds(struct demux_stream *ds)
872 {
873 if (ds->need_wakeup) {
874 if (ds->wakeup_cb) {
875 ds->wakeup_cb(ds->wakeup_cb_ctx);
876 } else if (ds->in->wakeup_cb) {
877 ds->in->wakeup_cb(ds->in->wakeup_cb_ctx);
878 }
879 ds->need_wakeup = false;
880 pthread_cond_signal(&ds->in->wakeup);
881 }
882 }
883
update_stream_selection_state(struct demux_internal * in,struct demux_stream * ds)884 static void update_stream_selection_state(struct demux_internal *in,
885 struct demux_stream *ds)
886 {
887 ds->eof = false;
888 ds->refreshing = false;
889
890 // We still have to go over the whole stream list to update ds->eager for
891 // other streams too, because they depend on other stream's selections.
892
893 bool any_av_streams = false;
894 bool any_streams = false;
895
896 for (int n = 0; n < in->num_streams; n++) {
897 struct demux_stream *s = in->streams[n]->ds;
898
899 s->still_image = s->sh->still_image;
900 s->eager = s->selected && !s->sh->attached_picture;
901 if (s->eager && !s->still_image)
902 any_av_streams |= s->type != STREAM_SUB;
903 any_streams |= s->selected;
904 }
905
906 // Subtitles are only eagerly read if there are no other eagerly read
907 // streams.
908 if (any_av_streams) {
909 for (int n = 0; n < in->num_streams; n++) {
910 struct demux_stream *s = in->streams[n]->ds;
911
912 if (s->type == STREAM_SUB)
913 s->eager = false;
914 }
915 }
916
917 if (!any_streams)
918 in->blocked = false;
919
920 ds_clear_reader_state(ds, true);
921
922 // Make sure any stream reselection or addition is reflected in the seek
923 // ranges, and also get rid of data that is not needed anymore (or
924 // rather, which can't be kept consistent). This has to happen after we've
925 // updated all the subtle state (like s->eager).
926 for (int n = 0; n < in->num_ranges; n++) {
927 struct demux_cached_range *range = in->ranges[n];
928
929 if (!ds->selected)
930 clear_queue(range->streams[ds->index]);
931
932 update_seek_ranges(range);
933 }
934
935 free_empty_cached_ranges(in);
936
937 wakeup_ds(ds);
938 }
939
demux_set_ts_offset(struct demuxer * demuxer,double offset)940 void demux_set_ts_offset(struct demuxer *demuxer, double offset)
941 {
942 struct demux_internal *in = demuxer->in;
943 pthread_mutex_lock(&in->lock);
944 in->ts_offset = offset;
945 pthread_mutex_unlock(&in->lock);
946 }
947
add_missing_streams(struct demux_internal * in,struct demux_cached_range * range)948 static void add_missing_streams(struct demux_internal *in,
949 struct demux_cached_range *range)
950 {
951 for (int n = range->num_streams; n < in->num_streams; n++) {
952 struct demux_stream *ds = in->streams[n]->ds;
953
954 struct demux_queue *queue = talloc_ptrtype(NULL, queue);
955 *queue = (struct demux_queue){
956 .ds = ds,
957 .range = range,
958 };
959 clear_queue(queue);
960 MP_TARRAY_APPEND(range, range->streams, range->num_streams, queue);
961 assert(range->streams[ds->index] == queue);
962 }
963 }
964
965 // Allocate a new sh_stream of the given type. It either has to be released
966 // with talloc_free(), or added to a demuxer with demux_add_sh_stream(). You
967 // cannot add or read packets from the stream before it has been added.
968 // type may be changed later, but only before demux_add_sh_stream().
demux_alloc_sh_stream(enum stream_type type)969 struct sh_stream *demux_alloc_sh_stream(enum stream_type type)
970 {
971 struct sh_stream *sh = talloc_ptrtype(NULL, sh);
972 *sh = (struct sh_stream) {
973 .type = type,
974 .index = -1,
975 .ff_index = -1, // may be overwritten by demuxer
976 .demuxer_id = -1, // ... same
977 .codec = talloc_zero(sh, struct mp_codec_params),
978 .tags = talloc_zero(sh, struct mp_tags),
979 };
980 sh->codec->type = type;
981 return sh;
982 }
983
984 // Add a new sh_stream to the demuxer. Note that as soon as the stream has been
985 // added, it must be immutable, and must not be released (this will happen when
986 // the demuxer is destroyed).
demux_add_sh_stream_locked(struct demux_internal * in,struct sh_stream * sh)987 static void demux_add_sh_stream_locked(struct demux_internal *in,
988 struct sh_stream *sh)
989 {
990 assert(!sh->ds); // must not be added yet
991
992 sh->index = in->num_streams;
993
994 sh->ds = talloc(sh, struct demux_stream);
995 *sh->ds = (struct demux_stream) {
996 .in = in,
997 .sh = sh,
998 .type = sh->type,
999 .index = sh->index,
1000 .global_correct_dts = true,
1001 .global_correct_pos = true,
1002 };
1003
1004 struct demux_stream *ds = sh->ds;
1005
1006 if (!sh->codec->codec)
1007 sh->codec->codec = "";
1008
1009 if (sh->ff_index < 0)
1010 sh->ff_index = sh->index;
1011
1012 MP_TARRAY_APPEND(in, in->streams, in->num_streams, sh);
1013 assert(in->streams[sh->index] == sh);
1014
1015 if (in->current_range) {
1016 for (int n = 0; n < in->num_ranges; n++)
1017 add_missing_streams(in, in->ranges[n]);
1018
1019 sh->ds->queue = in->current_range->streams[sh->ds->index];
1020 }
1021
1022 update_stream_selection_state(in, sh->ds);
1023
1024 switch (ds->type) {
1025 case STREAM_AUDIO:
1026 ds->back_preroll = in->opts->audio_back_preroll;
1027 if (ds->back_preroll < 0) { // auto
1028 ds->back_preroll = mp_codec_is_lossless(sh->codec->codec) ? 0 : 1;
1029 if (sh->codec->codec && (strcmp(sh->codec->codec, "opus") == 0 ||
1030 strcmp(sh->codec->codec, "vorbis") == 0 ||
1031 strcmp(sh->codec->codec, "mp3") == 0))
1032 ds->back_preroll = 2;
1033 }
1034 break;
1035 case STREAM_VIDEO:
1036 ds->back_preroll = in->opts->video_back_preroll;
1037 if (ds->back_preroll < 0)
1038 ds->back_preroll = 0; // auto
1039 break;
1040 }
1041
1042 if (!ds->sh->attached_picture) {
1043 // Typically this is used for webradio, so any stream will do.
1044 if (!in->metadata_stream)
1045 in->metadata_stream = sh;
1046 }
1047
1048 in->events |= DEMUX_EVENT_STREAMS;
1049 if (in->wakeup_cb)
1050 in->wakeup_cb(in->wakeup_cb_ctx);
1051 }
1052
1053 // For demuxer implementations only.
demux_add_sh_stream(struct demuxer * demuxer,struct sh_stream * sh)1054 void demux_add_sh_stream(struct demuxer *demuxer, struct sh_stream *sh)
1055 {
1056 struct demux_internal *in = demuxer->in;
1057 assert(demuxer == in->d_thread);
1058 pthread_mutex_lock(&in->lock);
1059 demux_add_sh_stream_locked(in, sh);
1060 pthread_mutex_unlock(&in->lock);
1061 }
1062
1063 // Return a stream with the given index. Since streams can only be added during
1064 // the lifetime of the demuxer, it is guaranteed that an index within the valid
1065 // range [0, demux_get_num_stream()) always returns a valid sh_stream pointer,
1066 // which will be valid until the demuxer is destroyed.
demux_get_stream(struct demuxer * demuxer,int index)1067 struct sh_stream *demux_get_stream(struct demuxer *demuxer, int index)
1068 {
1069 struct demux_internal *in = demuxer->in;
1070 pthread_mutex_lock(&in->lock);
1071 assert(index >= 0 && index < in->num_streams);
1072 struct sh_stream *r = in->streams[index];
1073 pthread_mutex_unlock(&in->lock);
1074 return r;
1075 }
1076
1077 // See demux_get_stream().
demux_get_num_stream(struct demuxer * demuxer)1078 int demux_get_num_stream(struct demuxer *demuxer)
1079 {
1080 struct demux_internal *in = demuxer->in;
1081 pthread_mutex_lock(&in->lock);
1082 int r = in->num_streams;
1083 pthread_mutex_unlock(&in->lock);
1084 return r;
1085 }
1086
1087 // It's UB to call anything but demux_dealloc() on the demuxer after this.
demux_shutdown(struct demux_internal * in)1088 static void demux_shutdown(struct demux_internal *in)
1089 {
1090 struct demuxer *demuxer = in->d_user;
1091
1092 if (in->recorder) {
1093 mp_recorder_destroy(in->recorder);
1094 in->recorder = NULL;
1095 }
1096
1097 dumper_close(in);
1098
1099 if (demuxer->desc->close)
1100 demuxer->desc->close(in->d_thread);
1101 demuxer->priv = NULL;
1102 in->d_thread->priv = NULL;
1103
1104 demux_flush(demuxer);
1105 assert(in->total_bytes == 0);
1106
1107 in->current_range = NULL;
1108 free_empty_cached_ranges(in);
1109
1110 talloc_free(in->cache);
1111 in->cache = NULL;
1112
1113 if (in->owns_stream)
1114 free_stream(demuxer->stream);
1115 demuxer->stream = NULL;
1116 }
1117
demux_dealloc(struct demux_internal * in)1118 static void demux_dealloc(struct demux_internal *in)
1119 {
1120 for (int n = 0; n < in->num_streams; n++)
1121 talloc_free(in->streams[n]);
1122 pthread_mutex_destroy(&in->lock);
1123 pthread_cond_destroy(&in->wakeup);
1124 talloc_free(in->d_user);
1125 }
1126
demux_free(struct demuxer * demuxer)1127 void demux_free(struct demuxer *demuxer)
1128 {
1129 if (!demuxer)
1130 return;
1131 struct demux_internal *in = demuxer->in;
1132 assert(demuxer == in->d_user);
1133
1134 demux_stop_thread(demuxer);
1135 demux_shutdown(in);
1136 demux_dealloc(in);
1137 }
1138
1139 // Start closing the demuxer and eventually freeing the demuxer asynchronously.
1140 // You must not access the demuxer once this has been started. Once the demuxer
1141 // is shutdown, the wakeup callback is invoked. Then you need to call
1142 // demux_free_async_finish() to end the operation (it must not be called from
1143 // the wakeup callback).
1144 // This can return NULL. Then the demuxer cannot be free'd asynchronously, and
1145 // you need to call demux_free() instead.
demux_free_async(struct demuxer * demuxer)1146 struct demux_free_async_state *demux_free_async(struct demuxer *demuxer)
1147 {
1148 struct demux_internal *in = demuxer->in;
1149 assert(demuxer == in->d_user);
1150
1151 if (!in->threading)
1152 return NULL;
1153
1154 pthread_mutex_lock(&in->lock);
1155 in->thread_terminate = true;
1156 in->shutdown_async = true;
1157 pthread_cond_signal(&in->wakeup);
1158 pthread_mutex_unlock(&in->lock);
1159
1160 return (struct demux_free_async_state *)demuxer->in; // lies
1161 }
1162
1163 // As long as state is valid, you can call this to request immediate abort.
1164 // Roughly behaves as demux_cancel_and_free(), except you still need to wait
1165 // for the result.
demux_free_async_force(struct demux_free_async_state * state)1166 void demux_free_async_force(struct demux_free_async_state *state)
1167 {
1168 struct demux_internal *in = (struct demux_internal *)state; // reverse lies
1169
1170 mp_cancel_trigger(in->d_user->cancel);
1171 }
1172
1173 // Check whether the demuxer is shutdown yet. If not, return false, and you
1174 // need to call this again in the future (preferably after you were notified by
1175 // the wakeup callback). If yes, deallocate all state, and return true (in
1176 // particular, the state ptr becomes invalid, and the wakeup callback will never
1177 // be called again).
demux_free_async_finish(struct demux_free_async_state * state)1178 bool demux_free_async_finish(struct demux_free_async_state *state)
1179 {
1180 struct demux_internal *in = (struct demux_internal *)state; // reverse lies
1181
1182 pthread_mutex_lock(&in->lock);
1183 bool busy = in->shutdown_async;
1184 pthread_mutex_unlock(&in->lock);
1185
1186 if (busy)
1187 return false;
1188
1189 demux_stop_thread(in->d_user);
1190 demux_dealloc(in);
1191 return true;
1192 }
1193
1194 // Like demux_free(), but trigger an abort, which will force the demuxer to
1195 // terminate immediately. If this wasn't opened with demux_open_url(), there is
1196 // some chance this will accidentally abort other things via demuxer->cancel.
demux_cancel_and_free(struct demuxer * demuxer)1197 void demux_cancel_and_free(struct demuxer *demuxer)
1198 {
1199 if (!demuxer)
1200 return;
1201 mp_cancel_trigger(demuxer->cancel);
1202 demux_free(demuxer);
1203 }
1204
1205 // Start the demuxer thread, which reads ahead packets on its own.
demux_start_thread(struct demuxer * demuxer)1206 void demux_start_thread(struct demuxer *demuxer)
1207 {
1208 struct demux_internal *in = demuxer->in;
1209 assert(demuxer == in->d_user);
1210
1211 if (!in->threading) {
1212 in->threading = true;
1213 if (pthread_create(&in->thread, NULL, demux_thread, in))
1214 in->threading = false;
1215 }
1216 }
1217
demux_stop_thread(struct demuxer * demuxer)1218 void demux_stop_thread(struct demuxer *demuxer)
1219 {
1220 struct demux_internal *in = demuxer->in;
1221 assert(demuxer == in->d_user);
1222
1223 if (in->threading) {
1224 pthread_mutex_lock(&in->lock);
1225 in->thread_terminate = true;
1226 pthread_cond_signal(&in->wakeup);
1227 pthread_mutex_unlock(&in->lock);
1228 pthread_join(in->thread, NULL);
1229 in->threading = false;
1230 in->thread_terminate = false;
1231 }
1232 }
1233
1234 // The demuxer thread will call cb(ctx) if there's a new packet, or EOF is reached.
demux_set_wakeup_cb(struct demuxer * demuxer,void (* cb)(void * ctx),void * ctx)1235 void demux_set_wakeup_cb(struct demuxer *demuxer, void (*cb)(void *ctx), void *ctx)
1236 {
1237 struct demux_internal *in = demuxer->in;
1238 pthread_mutex_lock(&in->lock);
1239 in->wakeup_cb = cb;
1240 in->wakeup_cb_ctx = ctx;
1241 pthread_mutex_unlock(&in->lock);
1242 }
1243
demux_start_prefetch(struct demuxer * demuxer)1244 void demux_start_prefetch(struct demuxer *demuxer)
1245 {
1246 struct demux_internal *in = demuxer->in;
1247 assert(demuxer == in->d_user);
1248
1249 pthread_mutex_lock(&in->lock);
1250 in->reading = true;
1251 pthread_cond_signal(&in->wakeup);
1252 pthread_mutex_unlock(&in->lock);
1253 }
1254
stream_type_name(enum stream_type type)1255 const char *stream_type_name(enum stream_type type)
1256 {
1257 switch (type) {
1258 case STREAM_VIDEO: return "video";
1259 case STREAM_AUDIO: return "audio";
1260 case STREAM_SUB: return "sub";
1261 default: return "unknown";
1262 }
1263 }
1264
demuxer_get_cc_track_locked(struct sh_stream * stream)1265 static struct sh_stream *demuxer_get_cc_track_locked(struct sh_stream *stream)
1266 {
1267 struct sh_stream *sh = stream->ds->cc;
1268
1269 if (!sh) {
1270 sh = demux_alloc_sh_stream(STREAM_SUB);
1271 if (!sh)
1272 return NULL;
1273 sh->codec->codec = "eia_608";
1274 sh->default_track = true;
1275 stream->ds->cc = sh;
1276 demux_add_sh_stream_locked(stream->ds->in, sh);
1277 sh->ds->ignore_eof = true;
1278 }
1279
1280 return sh;
1281 }
1282
demuxer_feed_caption(struct sh_stream * stream,demux_packet_t * dp)1283 void demuxer_feed_caption(struct sh_stream *stream, demux_packet_t *dp)
1284 {
1285 struct demux_internal *in = stream->ds->in;
1286
1287 pthread_mutex_lock(&in->lock);
1288 struct sh_stream *sh = demuxer_get_cc_track_locked(stream);
1289 if (!sh) {
1290 pthread_mutex_unlock(&in->lock);
1291 talloc_free(dp);
1292 return;
1293 }
1294
1295 dp->keyframe = true;
1296 dp->pts = MP_ADD_PTS(dp->pts, -in->ts_offset);
1297 dp->dts = MP_ADD_PTS(dp->dts, -in->ts_offset);
1298 dp->stream = sh->index;
1299 add_packet_locked(sh, dp);
1300 pthread_mutex_unlock(&in->lock);
1301 }
1302
error_on_backward_demuxing(struct demux_internal * in)1303 static void error_on_backward_demuxing(struct demux_internal *in)
1304 {
1305 if (!in->back_demuxing)
1306 return;
1307 MP_ERR(in, "Disabling backward demuxing.\n");
1308 in->back_demuxing = false;
1309 clear_reader_state(in, true);
1310 }
1311
perform_backward_seek(struct demux_internal * in)1312 static void perform_backward_seek(struct demux_internal *in)
1313 {
1314 double target = MP_NOPTS_VALUE;
1315
1316 for (int n = 0; n < in->num_streams; n++) {
1317 struct demux_stream *ds = in->streams[n]->ds;
1318
1319 if (ds->reader_head && !ds->back_restarting && !ds->back_resuming &&
1320 ds->eager)
1321 {
1322 ds->back_resuming = true;
1323 ds->back_resume_pos = ds->reader_head->pos;
1324 ds->back_resume_dts = ds->reader_head->dts;
1325 }
1326
1327 target = MP_PTS_MIN(target, ds->back_seek_pos);
1328 }
1329
1330 target = MP_PTS_OR_DEF(target, in->d_thread->start_time);
1331
1332 MP_VERBOSE(in, "triggering backward seek to get more packets\n");
1333 queue_seek(in, target, SEEK_SATAN | SEEK_HR, false);
1334 in->reading = true;
1335
1336 // Don't starve other threads.
1337 pthread_mutex_unlock(&in->lock);
1338 pthread_mutex_lock(&in->lock);
1339 }
1340
1341 // For incremental backward demuxing search work.
check_backward_seek(struct demux_internal * in)1342 static void check_backward_seek(struct demux_internal *in)
1343 {
1344 in->back_any_need_recheck = false;
1345
1346 for (int n = 0; n < in->num_streams; n++) {
1347 struct demux_stream *ds = in->streams[n]->ds;
1348
1349 if (ds->back_need_recheck)
1350 find_backward_restart_pos(ds);
1351 }
1352 }
1353
1354 // Search for a packet to resume demuxing from.
1355 // The implementation of this function is quite awkward, because the packet
1356 // queue is a singly linked list without back links, while it needs to search
1357 // backwards.
1358 // This is the core of backward demuxing.
find_backward_restart_pos(struct demux_stream * ds)1359 static void find_backward_restart_pos(struct demux_stream *ds)
1360 {
1361 struct demux_internal *in = ds->in;
1362
1363 ds->back_need_recheck = false;
1364 if (!ds->back_restarting)
1365 return;
1366
1367 struct demux_packet *first = ds->reader_head;
1368 struct demux_packet *last = ds->queue->tail;
1369
1370 if (first && !first->keyframe)
1371 MP_WARN(in, "Queue not starting on keyframe.\n");
1372
1373 // Packet at back_restart_pos. (Note: we don't actually need it, only the
1374 // packet immediately before it. But same effort.)
1375 // If this is NULL, look for EOF (resume from very last keyframe).
1376 struct demux_packet *back_restart = NULL;
1377
1378 if (ds->back_restart_next) {
1379 // Initial state. Switch to one of the other modi.
1380
1381 for (struct demux_packet *cur = first; cur; cur = cur->next) {
1382 // Restart for next keyframe after reader_head.
1383 if (cur != first && cur->keyframe) {
1384 ds->back_restart_dts = cur->dts;
1385 ds->back_restart_pos = cur->pos;
1386 ds->back_restart_eof = false;
1387 ds->back_restart_next = false;
1388 break;
1389 }
1390 }
1391
1392 if (ds->back_restart_next && ds->eof) {
1393 // Restart from end if nothing was found.
1394 ds->back_restart_eof = true;
1395 ds->back_restart_next = false;
1396 }
1397
1398 if (ds->back_restart_next)
1399 return;
1400 }
1401
1402 if (ds->back_restart_eof) {
1403 // We're trying to find EOF (without discarding packets). Only continue
1404 // if we really reach EOF.
1405 if (!ds->eof)
1406 return;
1407 } else if (!first && ds->eof) {
1408 // Reached EOF during normal backward demuxing. We probably returned the
1409 // last keyframe range to user. Need to resume at an earlier position.
1410 // Fall through, hit the no-keyframe case (and possibly the BOF check
1411 // if there are no packets at all), and then resume_earlier.
1412 } else if (!first) {
1413 return; // no packets yet
1414 } else {
1415 assert(last);
1416
1417 if ((ds->global_correct_dts && last->dts < ds->back_restart_dts) ||
1418 (ds->global_correct_pos && last->pos < ds->back_restart_pos))
1419 return; // restart pos not reached yet
1420
1421 // The target we're searching for is apparently before the start of the
1422 // queue.
1423 if ((ds->global_correct_dts && first->dts > ds->back_restart_dts) ||
1424 (ds->global_correct_pos && first->pos > ds->back_restart_pos))
1425 goto resume_earlier; // current position is too late; seek back
1426
1427
1428 for (struct demux_packet *cur = first; cur; cur = cur->next) {
1429 if ((ds->global_correct_dts && cur->dts == ds->back_restart_dts) ||
1430 (ds->global_correct_pos && cur->pos == ds->back_restart_pos))
1431 {
1432 back_restart = cur;
1433 break;
1434 }
1435 }
1436
1437 if (!back_restart) {
1438 // The packet should have been in the searched range; maybe dts/pos
1439 // determinism assumptions were broken.
1440 MP_ERR(in, "Demuxer not cooperating.\n");
1441 error_on_backward_demuxing(in);
1442 return;
1443 }
1444 }
1445
1446 // Find where to restart demuxing. It's usually the last keyframe packet
1447 // before restart_pos, but might be up to back_preroll + batch keyframe
1448 // packets earlier.
1449
1450 // (Normally, we'd just iterate backwards, but no back links.)
1451 int num_kf = 0;
1452 struct demux_packet *pre_1 = NULL; // idiotic "optimization" for total=1
1453 for (struct demux_packet *dp = first; dp != back_restart; dp = dp->next) {
1454 if (dp->keyframe) {
1455 num_kf++;
1456 pre_1 = dp;
1457 }
1458 }
1459
1460 // Number of renderable keyframes to return to user.
1461 // (Excludes preroll, which is decoded by user, but then discarded.)
1462 int batch = MPMAX(in->opts->back_batch[ds->type], 1);
1463 // Number of keyframes to return to the user in total.
1464 int total = batch + ds->back_preroll;
1465
1466 assert(total >= 1);
1467
1468 bool is_bof = ds->queue->is_bof &&
1469 (first == ds->queue->head || ds->back_seek_pos < ds->queue->seek_start);
1470
1471 struct demux_packet *target = NULL; // resume pos
1472 // nr. of keyframes, incl. target, excl. restart_pos
1473 int got_total = num_kf < total && is_bof ? num_kf : total;
1474 int got_preroll = MPMAX(got_total - batch, 0);
1475
1476 if (got_total == 1) {
1477 target = pre_1;
1478 } else if (got_total <= num_kf) {
1479 int cur_kf = 0;
1480 for (struct demux_packet *dp = first; dp != back_restart; dp = dp->next) {
1481 if (dp->keyframe) {
1482 if (num_kf - cur_kf == got_total) {
1483 target = dp;
1484 break;
1485 }
1486 cur_kf++;
1487 }
1488 }
1489 }
1490
1491 if (!target) {
1492 if (is_bof) {
1493 MP_VERBOSE(in, "BOF for stream %d\n", ds->index);
1494 ds->back_restarting = false;
1495 ds->back_range_started = false;
1496 ds->back_range_count = -1;
1497 ds->back_range_preroll = 0;
1498 ds->need_wakeup = true;
1499 wakeup_ds(ds);
1500 return;
1501 }
1502 goto resume_earlier;
1503 }
1504
1505 // Skip reader_head from previous keyframe to current one.
1506 // Or if preroll is involved, the first preroll packet.
1507 while (ds->reader_head != target) {
1508 if (!advance_reader_head(ds))
1509 assert(0); // target must be in list
1510 }
1511
1512 double seek_pts;
1513 compute_keyframe_times(target, &seek_pts, NULL);
1514 if (seek_pts != MP_NOPTS_VALUE)
1515 ds->back_seek_pos = seek_pts;
1516
1517 // For next backward adjust action.
1518 struct demux_packet *restart_pkt = NULL;
1519 int kf_pos = 0;
1520 for (struct demux_packet *dp = target; dp; dp = dp->next) {
1521 if (dp->keyframe) {
1522 if (kf_pos == got_preroll) {
1523 restart_pkt = dp;
1524 break;
1525 }
1526 kf_pos++;
1527 }
1528 }
1529 assert(restart_pkt);
1530 ds->back_restart_dts = restart_pkt->dts;
1531 ds->back_restart_pos = restart_pkt->pos;
1532
1533 ds->back_restarting = false;
1534 ds->back_range_started = false;
1535 ds->back_range_count = got_total;
1536 ds->back_range_preroll = got_preroll;
1537 ds->need_wakeup = true;
1538 wakeup_ds(ds);
1539 return;
1540
1541 resume_earlier:
1542 // We want to seek back to get earlier packets. But before we do this, we
1543 // must be sure that other streams have initialized their state. The only
1544 // time when this state is not initialized is right after the seek that
1545 // started backward demuxing (not any subsequent backstep seek). If this
1546 // initialization is omitted, the stream would try to start demuxing from
1547 // the "current" position. If another stream backstepped before that, the
1548 // other stream will miss the original seek target, and start playback from
1549 // a position that is too early.
1550 for (int n = 0; n < in->num_streams; n++) {
1551 struct demux_stream *ds2 = in->streams[n]->ds;
1552 if (ds2 == ds || !ds2->eager)
1553 continue;
1554
1555 if (ds2->back_restarting && ds2->back_restart_next) {
1556 MP_VERBOSE(in, "delaying stream %d for %d\n", ds->index, ds2->index);
1557 return;
1558 }
1559 }
1560
1561 if (ds->back_seek_pos != MP_NOPTS_VALUE) {
1562 struct demux_packet *t =
1563 find_seek_target(ds->queue, ds->back_seek_pos - 0.001, 0);
1564 if (t && t != ds->reader_head) {
1565 double pts;
1566 compute_keyframe_times(t, &pts, NULL);
1567 ds->back_seek_pos = MP_PTS_MIN(ds->back_seek_pos, pts);
1568 ds_clear_reader_state(ds, false);
1569 ds->reader_head = t;
1570 ds->back_need_recheck = true;
1571 in->back_any_need_recheck = true;
1572 pthread_cond_signal(&in->wakeup);
1573 } else {
1574 ds->back_seek_pos -= in->opts->back_seek_size;
1575 in->need_back_seek = true;
1576 }
1577 }
1578 }
1579
1580 // Process that one or multiple packets were added.
back_demux_see_packets(struct demux_stream * ds)1581 static void back_demux_see_packets(struct demux_stream *ds)
1582 {
1583 struct demux_internal *in = ds->in;
1584
1585 if (!ds->selected || !in->back_demuxing || !ds->eager)
1586 return;
1587
1588 assert(!(ds->back_resuming && ds->back_restarting));
1589
1590 if (!ds->global_correct_dts && !ds->global_correct_pos) {
1591 MP_ERR(in, "Can't demux backward due to demuxer problems.\n");
1592 error_on_backward_demuxing(in);
1593 return;
1594 }
1595
1596 while (ds->back_resuming && ds->reader_head) {
1597 struct demux_packet *head = ds->reader_head;
1598 if ((ds->global_correct_dts && head->dts == ds->back_resume_dts) ||
1599 (ds->global_correct_pos && head->pos == ds->back_resume_pos))
1600 {
1601 ds->back_resuming = false;
1602 ds->need_wakeup = true;
1603 wakeup_ds(ds); // probably
1604 break;
1605 }
1606 advance_reader_head(ds);
1607 }
1608
1609 if (ds->back_restarting)
1610 find_backward_restart_pos(ds);
1611 }
1612
1613 // Add the keyframe to the end of the index. Not all packets are actually added.
add_index_entry(struct demux_queue * queue,struct demux_packet * dp,double pts)1614 static void add_index_entry(struct demux_queue *queue, struct demux_packet *dp,
1615 double pts)
1616 {
1617 struct demux_internal *in = queue->ds->in;
1618
1619 assert(dp->keyframe && pts != MP_NOPTS_VALUE);
1620
1621 if (queue->num_index > 0) {
1622 struct index_entry *last = &QUEUE_INDEX_ENTRY(queue, queue->num_index - 1);
1623 if (pts - last->pts < INDEX_STEP_SIZE)
1624 return;
1625 }
1626
1627 if (queue->num_index == queue->index_size) {
1628 // Needs to honor power-of-2 requirement.
1629 size_t new_size = MPMAX(128, queue->index_size * 2);
1630 assert(!(new_size & (new_size - 1)));
1631 MP_DBG(in, "stream %d: resize index to %zu\n", queue->ds->index,
1632 new_size);
1633 // Note: we could tolerate allocation failure, and just discard the
1634 // entire index (and prevent the index from being recreated).
1635 MP_RESIZE_ARRAY(NULL, queue->index, new_size);
1636 size_t highest_index = queue->index0 + queue->num_index;
1637 for (size_t n = queue->index_size; n < highest_index; n++)
1638 queue->index[n] = queue->index[n - queue->index_size];
1639 in->total_bytes +=
1640 (new_size - queue->index_size) * sizeof(queue->index[0]);
1641 queue->index_size = new_size;
1642 }
1643
1644 assert(queue->num_index < queue->index_size);
1645
1646 queue->num_index += 1;
1647
1648 QUEUE_INDEX_ENTRY(queue, queue->num_index - 1) = (struct index_entry){
1649 .pts = pts,
1650 .pkt = dp,
1651 };
1652 }
1653
1654 // Check whether the next range in the list is, and if it appears to overlap,
1655 // try joining it into a single range.
attempt_range_joining(struct demux_internal * in)1656 static void attempt_range_joining(struct demux_internal *in)
1657 {
1658 struct demux_cached_range *current = in->current_range;
1659 struct demux_cached_range *next = NULL;
1660 double next_dist = INFINITY;
1661
1662 assert(current && in->num_ranges > 0);
1663 assert(current == in->ranges[in->num_ranges - 1]);
1664
1665 for (int n = 0; n < in->num_ranges - 1; n++) {
1666 struct demux_cached_range *range = in->ranges[n];
1667
1668 if (current->seek_start <= range->seek_start) {
1669 // This uses ">" to get some non-0 overlap.
1670 double dist = current->seek_end - range->seek_start;
1671 if (dist > 0 && dist < next_dist) {
1672 next = range;
1673 next_dist = dist;
1674 }
1675 }
1676 }
1677
1678 if (!next)
1679 return;
1680
1681 MP_VERBOSE(in, "going to join ranges %f-%f + %f-%f\n",
1682 current->seek_start, current->seek_end,
1683 next->seek_start, next->seek_end);
1684
1685 // Try to find a join point, where packets obviously overlap. (It would be
1686 // better and faster to do this incrementally, but probably too complex.)
1687 // The current range can overlap arbitrarily with the next one, not only by
1688 // by the seek overlap, but for arbitrary packet readahead as well.
1689 // We also drop the overlapping packets (if joining fails, we discard the
1690 // entire next range anyway, so this does no harm).
1691 for (int n = 0; n < in->num_streams; n++) {
1692 struct demux_stream *ds = in->streams[n]->ds;
1693
1694 struct demux_queue *q1 = current->streams[n];
1695 struct demux_queue *q2 = next->streams[n];
1696
1697 if (!ds->global_correct_pos && !ds->global_correct_dts) {
1698 MP_WARN(in, "stream %d: ranges unjoinable\n", n);
1699 goto failed;
1700 }
1701
1702 struct demux_packet *end = q1->tail;
1703 bool join_point_found = !end; // no packets yet -> joining will work
1704 if (end) {
1705 while (q2->head) {
1706 struct demux_packet *dp = q2->head;
1707
1708 // Some weird corner-case. We'd have to search the equivalent
1709 // packet in q1 to update it correctly. Better just give up.
1710 if (dp == q2->keyframe_latest) {
1711 MP_VERBOSE(in, "stream %d: not enough keyframes for join\n", n);
1712 goto failed;
1713 }
1714
1715 if ((ds->global_correct_dts && dp->dts == end->dts) ||
1716 (ds->global_correct_pos && dp->pos == end->pos))
1717 {
1718 // Do some additional checks as a (imperfect) sanity check
1719 // in case pos/dts are not "correct" across the ranges (we
1720 // never actually check that).
1721 if (dp->dts != end->dts || dp->pos != end->pos ||
1722 dp->pts != end->pts)
1723 {
1724 MP_WARN(in,
1725 "stream %d: non-repeatable demuxer behavior\n", n);
1726 goto failed;
1727 }
1728
1729 remove_head_packet(q2);
1730 join_point_found = true;
1731 break;
1732 }
1733
1734 // This happens if the next range misses the end packet. For
1735 // normal streams (ds->eager==true), this is a failure to find
1736 // an overlap. For subtitles, this can mean the current_range
1737 // has a subtitle somewhere before the end of its range, and
1738 // next has another subtitle somewhere after the start of its
1739 // range.
1740 if ((ds->global_correct_dts && dp->dts > end->dts) ||
1741 (ds->global_correct_pos && dp->pos > end->pos))
1742 break;
1743
1744 remove_head_packet(q2);
1745 }
1746 }
1747
1748 // For enabled non-sparse streams, always require an overlap packet.
1749 if (ds->eager && !join_point_found) {
1750 MP_WARN(in, "stream %d: no join point found\n", n);
1751 goto failed;
1752 }
1753 }
1754
1755 // Actually join the ranges. Now that we think it will work, mutate the
1756 // data associated with the current range.
1757
1758 for (int n = 0; n < in->num_streams; n++) {
1759 struct demux_queue *q1 = current->streams[n];
1760 struct demux_queue *q2 = next->streams[n];
1761
1762 struct demux_stream *ds = in->streams[n]->ds;
1763 assert(ds->queue == q1);
1764
1765 // First new packet that is appended to the current range.
1766 struct demux_packet *join_point = q2->head;
1767
1768 if (q2->head) {
1769 if (q1->head) {
1770 q1->tail->next = q2->head;
1771 } else {
1772 q1->head = q2->head;
1773 }
1774 q1->tail = q2->tail;
1775 }
1776
1777 q1->seek_end = q2->seek_end;
1778 q1->correct_dts &= q2->correct_dts;
1779 q1->correct_pos &= q2->correct_pos;
1780 q1->last_pos = q2->last_pos;
1781 q1->last_dts = q2->last_dts;
1782 q1->last_ts = q2->last_ts;
1783 q1->keyframe_latest = q2->keyframe_latest;
1784 q1->is_eof = q2->is_eof;
1785
1786 q1->last_pos_fixup = -1;
1787
1788 q2->head = q2->tail = NULL;
1789 q2->keyframe_first = NULL;
1790 q2->keyframe_latest = NULL;
1791
1792 if (ds->selected && !ds->reader_head)
1793 ds->reader_head = join_point;
1794 ds->skip_to_keyframe = false;
1795
1796 // Make the cum_pos values in all q2 packets continuous.
1797 for (struct demux_packet *dp = join_point; dp; dp = dp->next) {
1798 uint64_t next_pos = dp->next ? dp->next->cum_pos : q2->tail_cum_pos;
1799 uint64_t size = next_pos - dp->cum_pos;
1800 dp->cum_pos = q1->tail_cum_pos;
1801 q1->tail_cum_pos += size;
1802 }
1803
1804 // And update the index with packets from q2.
1805 for (size_t i = 0; i < q2->num_index; i++) {
1806 struct index_entry *e = &QUEUE_INDEX_ENTRY(q2, i);
1807 add_index_entry(q1, e->pkt, e->pts);
1808 }
1809 free_index(q2);
1810
1811 // For moving demuxer position.
1812 ds->refreshing = ds->selected;
1813 }
1814
1815 for (int n = 0; n < next->num_metadata; n++) {
1816 MP_TARRAY_APPEND(current, current->metadata, current->num_metadata,
1817 next->metadata[n]);
1818 }
1819 next->num_metadata = 0;
1820
1821 update_seek_ranges(current);
1822
1823 // Move demuxing position to after the current range.
1824 in->seeking = true;
1825 in->seek_flags = SEEK_HR;
1826 in->seek_pts = next->seek_end - 1.0;
1827
1828 MP_VERBOSE(in, "ranges joined!\n");
1829
1830 for (int n = 0; n < in->num_streams; n++)
1831 back_demux_see_packets(in->streams[n]->ds);
1832
1833 failed:
1834 clear_cached_range(in, next);
1835 free_empty_cached_ranges(in);
1836 }
1837
1838 // Compute the assumed first and last frame timestamp for keyframe range
1839 // starting at pkt. To get valid results, pkt->keyframe must be true, otherwise
1840 // nonsense will be returned.
1841 // Always sets *out_kf_min and *out_kf_max without reading them. Both are set
1842 // to NOPTS if there are no timestamps at all in the stream. *kf_max will not
1843 // be set to the actual end time of the decoded output, just the last frame
1844 // (audio will typically end up with kf_min==kf_max).
1845 // Either of out_kf_min and out_kf_max can be NULL, which discards the result.
1846 // Return the next keyframe packet after pkt, or NULL if there's none.
compute_keyframe_times(struct demux_packet * pkt,double * out_kf_min,double * out_kf_max)1847 static struct demux_packet *compute_keyframe_times(struct demux_packet *pkt,
1848 double *out_kf_min,
1849 double *out_kf_max)
1850 {
1851 struct demux_packet *start = pkt;
1852 double min = MP_NOPTS_VALUE;
1853 double max = MP_NOPTS_VALUE;
1854
1855 while (pkt) {
1856 if (pkt->keyframe && pkt != start)
1857 break;
1858
1859 double ts = MP_PTS_OR_DEF(pkt->pts, pkt->dts);
1860 if (pkt->segmented && ((pkt->start != MP_NOPTS_VALUE && ts < pkt->start) ||
1861 (pkt->end != MP_NOPTS_VALUE && ts > pkt->end)))
1862 ts = MP_NOPTS_VALUE;
1863
1864 min = MP_PTS_MIN(min, ts);
1865 max = MP_PTS_MAX(max, ts);
1866
1867 pkt = pkt->next;
1868 }
1869
1870 if (out_kf_min)
1871 *out_kf_min = min;
1872 if (out_kf_max)
1873 *out_kf_max = max;
1874 return pkt;
1875 }
1876
1877 // Determine seekable range when a packet is added. If dp==NULL, treat it as
1878 // EOF (i.e. closes the current block).
1879 // This has to deal with a number of corner cases, such as demuxers potentially
1880 // starting output at non-keyframes.
1881 // Can join seek ranges, which messes with in->current_range and all.
adjust_seek_range_on_packet(struct demux_stream * ds,struct demux_packet * dp)1882 static void adjust_seek_range_on_packet(struct demux_stream *ds,
1883 struct demux_packet *dp)
1884 {
1885 struct demux_queue *queue = ds->queue;
1886
1887 if (!ds->in->seekable_cache)
1888 return;
1889
1890 bool new_eof = !dp;
1891 bool update_ranges = queue->is_eof != new_eof;
1892 queue->is_eof = new_eof;
1893
1894 if (!dp || dp->keyframe) {
1895 if (queue->keyframe_latest) {
1896 double kf_min, kf_max;
1897 compute_keyframe_times(queue->keyframe_latest, &kf_min, &kf_max);
1898
1899 if (kf_min != MP_NOPTS_VALUE) {
1900 add_index_entry(queue, queue->keyframe_latest, kf_min);
1901
1902 // Initialize the queue's start if it's unset.
1903 if (queue->seek_start == MP_NOPTS_VALUE) {
1904 update_ranges = true;
1905 queue->seek_start = kf_min + ds->sh->seek_preroll;
1906 }
1907 }
1908
1909 if (kf_max != MP_NOPTS_VALUE &&
1910 (queue->seek_end == MP_NOPTS_VALUE || kf_max > queue->seek_end))
1911 {
1912 // If the queue was past the current range's end even before
1913 // this update, it means _other_ streams are not there yet,
1914 // and the seek range doesn't need to be updated. This means
1915 // if the _old_ queue->seek_end was already after the range end,
1916 // then the new seek_end won't extend the range either.
1917 if (queue->range->seek_end == MP_NOPTS_VALUE ||
1918 queue->seek_end <= queue->range->seek_end)
1919 {
1920 update_ranges = true;
1921 }
1922
1923 queue->seek_end = kf_max;
1924 }
1925 }
1926
1927 queue->keyframe_latest = dp;
1928 }
1929
1930 // Adding a sparse packet never changes the seek range.
1931 if (update_ranges && ds->eager) {
1932 update_seek_ranges(queue->range);
1933 attempt_range_joining(ds->in);
1934 }
1935 }
1936
recorder_create(struct demux_internal * in,const char * dst)1937 static struct mp_recorder *recorder_create(struct demux_internal *in,
1938 const char *dst)
1939 {
1940 struct sh_stream **streams = NULL;
1941 int num_streams = 0;
1942 for (int n = 0; n < in->num_streams; n++) {
1943 struct sh_stream *stream = in->streams[n];
1944 if (stream->ds->selected)
1945 MP_TARRAY_APPEND(NULL, streams, num_streams, stream);
1946 }
1947
1948 struct demuxer *demuxer = in->d_thread;
1949 struct demux_attachment **attachments = talloc_array(NULL, struct demux_attachment*, demuxer->num_attachments);
1950 for (int n = 0; n < demuxer->num_attachments; n++) {
1951 attachments[n] = &demuxer->attachments[n];
1952 }
1953
1954 struct mp_recorder *res = mp_recorder_create(in->d_thread->global, dst,
1955 streams, num_streams,
1956 attachments, demuxer->num_attachments);
1957 talloc_free(streams);
1958 talloc_free(attachments);
1959 return res;
1960 }
1961
write_dump_packet(struct demux_internal * in,struct demux_packet * dp)1962 static void write_dump_packet(struct demux_internal *in, struct demux_packet *dp)
1963 {
1964 assert(in->dumper);
1965 assert(in->dumper_status == CONTROL_TRUE);
1966
1967 struct mp_recorder_sink *sink =
1968 mp_recorder_get_sink(in->dumper, in->streams[dp->stream]);
1969 if (sink) {
1970 mp_recorder_feed_packet(sink, dp);
1971 } else {
1972 MP_ERR(in, "New stream appeared; stopping recording.\n");
1973 in->dumper_status = CONTROL_ERROR;
1974 }
1975 }
1976
record_packet(struct demux_internal * in,struct demux_packet * dp)1977 static void record_packet(struct demux_internal *in, struct demux_packet *dp)
1978 {
1979 // (should preferably be outside of the lock)
1980 if (in->enable_recording && !in->recorder &&
1981 in->opts->record_file && in->opts->record_file[0])
1982 {
1983 // Later failures shouldn't make it retry and overwrite the previously
1984 // recorded file.
1985 in->enable_recording = false;
1986
1987 in->recorder = recorder_create(in, in->opts->record_file);
1988 if (!in->recorder)
1989 MP_ERR(in, "Disabling recording.\n");
1990 }
1991
1992 if (in->recorder) {
1993 struct mp_recorder_sink *sink =
1994 mp_recorder_get_sink(in->recorder, in->streams[dp->stream]);
1995 if (sink) {
1996 mp_recorder_feed_packet(sink, dp);
1997 } else {
1998 MP_ERR(in, "New stream appeared; stopping recording.\n");
1999 mp_recorder_destroy(in->recorder);
2000 in->recorder = NULL;
2001 }
2002 }
2003
2004 if (in->dumper_status == CONTROL_OK)
2005 write_dump_packet(in, dp);
2006 }
2007
add_packet_locked(struct sh_stream * stream,demux_packet_t * dp)2008 static void add_packet_locked(struct sh_stream *stream, demux_packet_t *dp)
2009 {
2010 struct demux_stream *ds = stream ? stream->ds : NULL;
2011 if (!dp->len || demux_cancel_test(ds->in->d_thread)) {
2012 talloc_free(dp);
2013 return;
2014 }
2015
2016 assert(dp->stream == stream->index);
2017 assert(!dp->next);
2018
2019 struct demux_internal *in = ds->in;
2020
2021 in->after_seek = false;
2022 in->after_seek_to_start = false;
2023
2024 double ts = dp->dts == MP_NOPTS_VALUE ? dp->pts : dp->dts;
2025 if (dp->segmented)
2026 ts = MP_PTS_MIN(ts, dp->end);
2027
2028 if (ts != MP_NOPTS_VALUE)
2029 in->demux_ts = ts;
2030
2031 struct demux_queue *queue = ds->queue;
2032
2033 bool drop = !ds->selected || in->seeking || ds->sh->attached_picture;
2034
2035 if (!drop) {
2036 // If libavformat splits packets, some packets will have pos unset, so
2037 // make up one based on the first packet => makes refresh seeks work.
2038 if ((dp->pos < 0 || dp->pos == queue->last_pos_fixup) &&
2039 !dp->keyframe && queue->last_pos_fixup >= 0)
2040 dp->pos = queue->last_pos_fixup + 1;
2041 queue->last_pos_fixup = dp->pos;
2042 }
2043
2044 if (!drop && ds->refreshing) {
2045 // Resume reading once the old position was reached (i.e. we start
2046 // returning packets where we left off before the refresh).
2047 // If it's the same position, drop, but continue normally next time.
2048 if (queue->correct_dts) {
2049 ds->refreshing = dp->dts < queue->last_dts;
2050 } else if (queue->correct_pos) {
2051 ds->refreshing = dp->pos < queue->last_pos;
2052 } else {
2053 ds->refreshing = false; // should not happen
2054 MP_WARN(in, "stream %d: demux refreshing failed\n", ds->index);
2055 }
2056 drop = true;
2057 }
2058
2059 if (drop) {
2060 talloc_free(dp);
2061 return;
2062 }
2063
2064 record_packet(in, dp);
2065
2066 if (in->cache && in->opts->disk_cache) {
2067 int64_t pos = demux_cache_write(in->cache, dp);
2068 if (pos >= 0) {
2069 demux_packet_unref_contents(dp);
2070 dp->is_cached = true;
2071 dp->cached_data.pos = pos;
2072 }
2073 }
2074
2075 queue->correct_pos &= dp->pos >= 0 && dp->pos > queue->last_pos;
2076 queue->correct_dts &= dp->dts != MP_NOPTS_VALUE && dp->dts > queue->last_dts;
2077 queue->last_pos = dp->pos;
2078 queue->last_dts = dp->dts;
2079 ds->global_correct_pos &= queue->correct_pos;
2080 ds->global_correct_dts &= queue->correct_dts;
2081
2082 // (keep in mind that even if the reader went out of data, the queue is not
2083 // necessarily empty due to the backbuffer)
2084 if (!ds->reader_head && (!ds->skip_to_keyframe || dp->keyframe)) {
2085 ds->reader_head = dp;
2086 ds->skip_to_keyframe = false;
2087 }
2088
2089 size_t bytes = demux_packet_estimate_total_size(dp);
2090 in->total_bytes += bytes;
2091 dp->cum_pos = queue->tail_cum_pos;
2092 queue->tail_cum_pos += bytes;
2093
2094 if (queue->tail) {
2095 // next packet in stream
2096 queue->tail->next = dp;
2097 queue->tail = dp;
2098 } else {
2099 // first packet in stream
2100 queue->head = queue->tail = dp;
2101 }
2102
2103 if (!ds->ignore_eof) {
2104 // obviously not true anymore
2105 ds->eof = false;
2106 in->eof = false;
2107 }
2108
2109 // For video, PTS determination is not trivial, but for other media types
2110 // distinguishing PTS and DTS is not useful.
2111 if (stream->type != STREAM_VIDEO && dp->pts == MP_NOPTS_VALUE)
2112 dp->pts = dp->dts;
2113
2114 if (ts != MP_NOPTS_VALUE && (ts > queue->last_ts || ts + 10 < queue->last_ts))
2115 queue->last_ts = ts;
2116 if (ds->base_ts == MP_NOPTS_VALUE)
2117 ds->base_ts = queue->last_ts;
2118
2119 const char *num_pkts = queue->head == queue->tail ? "1" : ">1";
2120 uint64_t fw_bytes = get_foward_buffered_bytes(ds);
2121 MP_TRACE(in, "append packet to %s: size=%zu pts=%f dts=%f pos=%"PRIi64" "
2122 "[num=%s size=%zd]\n", stream_type_name(stream->type),
2123 dp->len, dp->pts, dp->dts, dp->pos, num_pkts, (size_t)fw_bytes);
2124
2125 adjust_seek_range_on_packet(ds, dp);
2126
2127 // May need to reduce backward cache.
2128 prune_old_packets(in);
2129
2130 // Possibly update duration based on highest TS demuxed (but ignore subs).
2131 if (stream->type != STREAM_SUB) {
2132 if (dp->segmented)
2133 ts = MP_PTS_MIN(ts, dp->end);
2134 if (ts > in->highest_av_pts) {
2135 in->highest_av_pts = ts;
2136 double duration = in->highest_av_pts - in->d_thread->start_time;
2137 if (duration > in->d_thread->duration) {
2138 in->d_thread->duration = duration;
2139 // (Don't wakeup user thread, would be too noisy.)
2140 in->events |= DEMUX_EVENT_DURATION;
2141 in->duration = duration;
2142 }
2143 }
2144 }
2145
2146 // Don't process the packet further if it's skipped by the previous seek
2147 // (see reader_head check/assignment above).
2148 if (!ds->reader_head)
2149 return;
2150
2151 back_demux_see_packets(ds);
2152
2153 wakeup_ds(ds);
2154 }
2155
mark_stream_eof(struct demux_stream * ds)2156 static void mark_stream_eof(struct demux_stream *ds)
2157 {
2158 if (!ds->eof) {
2159 ds->eof = true;
2160 adjust_seek_range_on_packet(ds, NULL);
2161 back_demux_see_packets(ds);
2162 wakeup_ds(ds);
2163 }
2164 }
2165
lazy_stream_needs_wait(struct demux_stream * ds)2166 static bool lazy_stream_needs_wait(struct demux_stream *ds)
2167 {
2168 struct demux_internal *in = ds->in;
2169 // Attempt to read until force_read_until was reached, or reading has
2170 // stopped for some reason (true EOF, queue overflow).
2171 return !ds->eager && !ds->reader_head && !in->back_demuxing &&
2172 !in->eof && ds->force_read_until != MP_NOPTS_VALUE &&
2173 (in->demux_ts == MP_NOPTS_VALUE ||
2174 in->demux_ts <= ds->force_read_until);
2175 }
2176
2177 // Returns true if there was "progress" (lock was released temporarily).
read_packet(struct demux_internal * in)2178 static bool read_packet(struct demux_internal *in)
2179 {
2180 bool was_reading = in->reading;
2181 in->reading = false;
2182
2183 if (!was_reading || in->blocked || demux_cancel_test(in->d_thread))
2184 return false;
2185
2186 // Check if we need to read a new packet. We do this if all queues are below
2187 // the minimum, or if a stream explicitly needs new packets. Also includes
2188 // safe-guards against packet queue overflow.
2189 bool read_more = false, prefetch_more = false, refresh_more = false;
2190 uint64_t total_fw_bytes = 0;
2191 for (int n = 0; n < in->num_streams; n++) {
2192 struct demux_stream *ds = in->streams[n]->ds;
2193 if (ds->eager) {
2194 read_more |= !ds->reader_head;
2195 if (in->back_demuxing)
2196 read_more |= ds->back_restarting || ds->back_resuming;
2197 } else {
2198 if (lazy_stream_needs_wait(ds)) {
2199 read_more = true;
2200 } else {
2201 mark_stream_eof(ds); // let playback continue
2202 }
2203 }
2204 refresh_more |= ds->refreshing;
2205 if (ds->eager && ds->queue->last_ts != MP_NOPTS_VALUE &&
2206 in->min_secs > 0 && ds->base_ts != MP_NOPTS_VALUE &&
2207 ds->queue->last_ts >= ds->base_ts &&
2208 !in->back_demuxing)
2209 prefetch_more |= ds->queue->last_ts - ds->base_ts < in->min_secs;
2210 total_fw_bytes += get_foward_buffered_bytes(ds);
2211 }
2212
2213 MP_TRACE(in, "bytes=%zd, read_more=%d prefetch_more=%d, refresh_more=%d\n",
2214 (size_t)total_fw_bytes, read_more, prefetch_more, refresh_more);
2215 if (total_fw_bytes >= in->max_bytes) {
2216 // if we hit the limit just by prefetching, simply stop prefetching
2217 if (!read_more)
2218 return false;
2219 if (!in->warned_queue_overflow) {
2220 in->warned_queue_overflow = true;
2221 MP_WARN(in, "Too many packets in the demuxer packet queues:\n");
2222 for (int n = 0; n < in->num_streams; n++) {
2223 struct demux_stream *ds = in->streams[n]->ds;
2224 if (ds->selected) {
2225 size_t num_pkts = 0;
2226 for (struct demux_packet *dp = ds->reader_head;
2227 dp; dp = dp->next)
2228 num_pkts++;
2229 uint64_t fw_bytes = get_foward_buffered_bytes(ds);
2230 MP_WARN(in, " %s/%d: %zd packets, %zd bytes%s%s\n",
2231 stream_type_name(ds->type), n,
2232 num_pkts, (size_t)fw_bytes,
2233 ds->eager ? "" : " (lazy)",
2234 ds->refreshing ? " (refreshing)" : "");
2235 }
2236 }
2237 if (in->back_demuxing)
2238 MP_ERR(in, "Backward playback is likely stuck/broken now.\n");
2239 }
2240 for (int n = 0; n < in->num_streams; n++) {
2241 struct demux_stream *ds = in->streams[n]->ds;
2242 if (!ds->reader_head)
2243 mark_stream_eof(ds);
2244 }
2245 return false;
2246 }
2247
2248 if (!read_more && !prefetch_more && !refresh_more)
2249 return false;
2250
2251 if (in->after_seek_to_start) {
2252 for (int n = 0; n < in->num_streams; n++) {
2253 struct demux_stream *ds = in->streams[n]->ds;
2254 in->current_range->streams[n]->is_bof =
2255 ds->selected && !ds->refreshing;
2256 }
2257 }
2258
2259 // Actually read a packet. Drop the lock while doing so, because waiting
2260 // for disk or network I/O can take time.
2261 in->reading = true;
2262 in->after_seek = false;
2263 in->after_seek_to_start = false;
2264 pthread_mutex_unlock(&in->lock);
2265
2266 struct demuxer *demux = in->d_thread;
2267 struct demux_packet *pkt = NULL;
2268
2269 bool eof = true;
2270 if (demux->desc->read_packet && !demux_cancel_test(demux))
2271 eof = !demux->desc->read_packet(demux, &pkt);
2272
2273 pthread_mutex_lock(&in->lock);
2274 update_cache(in);
2275
2276 if (pkt) {
2277 assert(pkt->stream >= 0 && pkt->stream < in->num_streams);
2278 add_packet_locked(in->streams[pkt->stream], pkt);
2279 }
2280
2281 if (!in->seeking) {
2282 if (eof) {
2283 for (int n = 0; n < in->num_streams; n++)
2284 mark_stream_eof(in->streams[n]->ds);
2285 // If we had EOF previously, then don't wakeup (avoids wakeup loop)
2286 if (!in->eof) {
2287 if (in->wakeup_cb)
2288 in->wakeup_cb(in->wakeup_cb_ctx);
2289 pthread_cond_signal(&in->wakeup);
2290 MP_VERBOSE(in, "EOF reached.\n");
2291 }
2292 }
2293 in->eof = eof;
2294 in->reading = !eof;
2295 }
2296 return true;
2297 }
2298
prune_old_packets(struct demux_internal * in)2299 static void prune_old_packets(struct demux_internal *in)
2300 {
2301 assert(in->current_range == in->ranges[in->num_ranges - 1]);
2302
2303 // It's not clear what the ideal way to prune old packets is. For now, we
2304 // prune the oldest packet runs, as long as the total cache amount is too
2305 // big.
2306 while (1) {
2307 uint64_t fw_bytes = 0;
2308 for (int n = 0; n < in->num_streams; n++) {
2309 struct demux_stream *ds = in->streams[n]->ds;
2310 fw_bytes += get_foward_buffered_bytes(ds);
2311 }
2312 uint64_t max_avail = in->max_bytes_bw;
2313 // Backward cache (if enabled at all) can use unused forward cache.
2314 // Still leave 1 byte free, so the read_packet logic doesn't get stuck.
2315 if (max_avail && in->max_bytes > (fw_bytes + 1) && in->opts->donate_fw)
2316 max_avail += in->max_bytes - (fw_bytes + 1);
2317 if (in->total_bytes - fw_bytes <= max_avail)
2318 break;
2319
2320 // (Start from least recently used range.)
2321 struct demux_cached_range *range = in->ranges[0];
2322 double earliest_ts = MP_NOPTS_VALUE;
2323 struct demux_stream *earliest_stream = NULL;
2324
2325 for (int n = 0; n < range->num_streams; n++) {
2326 struct demux_queue *queue = range->streams[n];
2327 struct demux_stream *ds = queue->ds;
2328
2329 if (queue->head && queue->head != ds->reader_head) {
2330 struct demux_packet *dp = queue->head;
2331 double ts = queue->seek_start;
2332 // If the ts is NOPTS, the queue has no retainable packets, so
2333 // delete them all. This code is not run when there's enough
2334 // free space, so normally the queue gets the chance to build up.
2335 bool prune_always =
2336 !in->seekable_cache || ts == MP_NOPTS_VALUE || !dp->keyframe;
2337 if (prune_always || !earliest_stream || ts < earliest_ts) {
2338 earliest_ts = ts;
2339 earliest_stream = ds;
2340 if (prune_always)
2341 break;
2342 }
2343 }
2344 }
2345
2346 // In some cases (like when the seek index became huge), there aren't
2347 // any backwards packets, even if the total cache size is exceeded.
2348 if (!earliest_stream)
2349 break;
2350
2351 struct demux_stream *ds = earliest_stream;
2352 struct demux_queue *queue = range->streams[ds->index];
2353
2354 bool non_kf_prune = queue->head && !queue->head->keyframe;
2355 bool kf_was_pruned = false;
2356
2357 while (queue->head && queue->head != ds->reader_head) {
2358 if (queue->head->keyframe) {
2359 // If the cache is seekable, only delete until up the next
2360 // keyframe. This is not always efficient, but ensures we
2361 // prune all streams fairly.
2362 // Also, if the first packet was _not_ a keyframe, we want it
2363 // to remove all preceding non-keyframe packets first, before
2364 // re-evaluating what to prune next.
2365 if ((kf_was_pruned || non_kf_prune) && in->seekable_cache)
2366 break;
2367 kf_was_pruned = true;
2368 }
2369
2370 remove_head_packet(queue);
2371 }
2372
2373 // Need to update the seekable time range.
2374 if (kf_was_pruned) {
2375 assert(!queue->keyframe_first); // it was just deleted, supposedly
2376
2377 queue->keyframe_first = queue->head;
2378 // (May happen if reader_head stopped pruning the range, and there's
2379 // no next range.)
2380 while (queue->keyframe_first && !queue->keyframe_first->keyframe)
2381 queue->keyframe_first = queue->keyframe_first->next;
2382
2383 if (queue->seek_start != MP_NOPTS_VALUE)
2384 queue->last_pruned = queue->seek_start;
2385
2386 double kf_min;
2387 compute_keyframe_times(queue->keyframe_first, &kf_min, NULL);
2388
2389 bool update_range = true;
2390
2391 queue->seek_start = kf_min;
2392
2393 if (queue->seek_start != MP_NOPTS_VALUE) {
2394 queue->seek_start += ds->sh->seek_preroll;
2395
2396 // Don't need to update if the new start is still before the
2397 // range's start (or if the range was undefined anyway).
2398 if (range->seek_start == MP_NOPTS_VALUE ||
2399 queue->seek_start <= range->seek_start)
2400 {
2401 update_range = false;
2402 }
2403 }
2404
2405 if (update_range)
2406 update_seek_ranges(range);
2407 }
2408
2409 if (range != in->current_range && range->seek_start == MP_NOPTS_VALUE)
2410 free_empty_cached_ranges(in);
2411 }
2412 }
2413
execute_trackswitch(struct demux_internal * in)2414 static void execute_trackswitch(struct demux_internal *in)
2415 {
2416 in->tracks_switched = false;
2417
2418 bool any_selected = false;
2419 for (int n = 0; n < in->num_streams; n++)
2420 any_selected |= in->streams[n]->ds->selected;
2421
2422 pthread_mutex_unlock(&in->lock);
2423
2424 if (in->d_thread->desc->switched_tracks)
2425 in->d_thread->desc->switched_tracks(in->d_thread);
2426
2427 pthread_mutex_lock(&in->lock);
2428 }
2429
execute_seek(struct demux_internal * in)2430 static void execute_seek(struct demux_internal *in)
2431 {
2432 int flags = in->seek_flags;
2433 double pts = in->seek_pts;
2434 in->eof = false;
2435 in->seeking = false;
2436 in->seeking_in_progress = pts;
2437 in->demux_ts = MP_NOPTS_VALUE;
2438 in->low_level_seeks += 1;
2439 in->after_seek = true;
2440 in->after_seek_to_start =
2441 !(flags & (SEEK_FORWARD | SEEK_FACTOR)) &&
2442 pts <= in->d_thread->start_time;
2443
2444 for (int n = 0; n < in->num_streams; n++)
2445 in->streams[n]->ds->queue->last_pos_fixup = -1;
2446
2447 if (in->recorder)
2448 mp_recorder_mark_discontinuity(in->recorder);
2449
2450 pthread_mutex_unlock(&in->lock);
2451
2452 MP_VERBOSE(in, "execute seek (to %f flags %d)\n", pts, flags);
2453
2454 if (in->d_thread->desc->seek)
2455 in->d_thread->desc->seek(in->d_thread, pts, flags);
2456
2457 MP_VERBOSE(in, "seek done\n");
2458
2459 pthread_mutex_lock(&in->lock);
2460
2461 in->seeking_in_progress = MP_NOPTS_VALUE;
2462 }
2463
update_opts(struct demux_internal * in)2464 static void update_opts(struct demux_internal *in)
2465 {
2466 struct demux_opts *opts = in->opts;
2467
2468 in->min_secs = opts->min_secs;
2469 in->max_bytes = opts->max_bytes;
2470 in->max_bytes_bw = opts->max_bytes_bw;
2471
2472 int seekable = opts->seekable_cache;
2473 bool is_streaming = in->d_thread->is_streaming;
2474 bool use_cache = is_streaming;
2475 if (opts->enable_cache >= 0)
2476 use_cache = opts->enable_cache == 1;
2477
2478 if (use_cache) {
2479 in->min_secs = MPMAX(in->min_secs, opts->min_secs_cache);
2480 if (seekable < 0)
2481 seekable = 1;
2482 }
2483 in->seekable_cache = seekable == 1;
2484 in->using_network_cache_opts = is_streaming && use_cache;
2485
2486 if (!in->seekable_cache)
2487 in->max_bytes_bw = 0;
2488
2489 if (!in->can_cache) {
2490 in->seekable_cache = false;
2491 in->min_secs = 0;
2492 in->max_bytes = 1;
2493 in->max_bytes_bw = 0;
2494 in->using_network_cache_opts = false;
2495 }
2496
2497 if (in->seekable_cache && opts->disk_cache && !in->cache) {
2498 in->cache = demux_cache_create(in->global, in->log);
2499 if (!in->cache)
2500 MP_ERR(in, "Failed to create file cache.\n");
2501 }
2502
2503 // The filename option really decides whether recording should be active.
2504 // So if the filename changes, act upon it.
2505 char *old = in->record_filename ? in->record_filename : "";
2506 char *new = opts->record_file ? opts->record_file : "";
2507 if (strcmp(old, new) != 0) {
2508 if (in->recorder) {
2509 MP_WARN(in, "Stopping recording.\n");
2510 mp_recorder_destroy(in->recorder);
2511 in->recorder = NULL;
2512 }
2513 talloc_free(in->record_filename);
2514 in->record_filename = talloc_strdup(in, opts->record_file);
2515 // Note: actual recording only starts once packets are read. It may be
2516 // important to delay creating in->recorder to that point, because the
2517 // demuxer might detect more streams until finding the first packet.
2518 in->enable_recording = in->can_record;
2519 }
2520
2521 // In case the cache was reduced in size.
2522 prune_old_packets(in);
2523
2524 // In case the seekable cache was disabled.
2525 free_empty_cached_ranges(in);
2526 }
2527
2528 // Make demuxing progress. Return whether progress was made.
thread_work(struct demux_internal * in)2529 static bool thread_work(struct demux_internal *in)
2530 {
2531 if (m_config_cache_update(in->opts_cache))
2532 update_opts(in);
2533 if (in->tracks_switched) {
2534 execute_trackswitch(in);
2535 return true;
2536 }
2537 if (in->need_back_seek) {
2538 perform_backward_seek(in);
2539 return true;
2540 }
2541 if (in->back_any_need_recheck) {
2542 check_backward_seek(in);
2543 return true;
2544 }
2545 if (in->seeking) {
2546 execute_seek(in);
2547 return true;
2548 }
2549 if (read_packet(in))
2550 return true; // read_packet unlocked, so recheck conditions
2551 if (mp_time_us() >= in->next_cache_update) {
2552 update_cache(in);
2553 return true;
2554 }
2555 return false;
2556 }
2557
demux_thread(void * pctx)2558 static void *demux_thread(void *pctx)
2559 {
2560 struct demux_internal *in = pctx;
2561 mpthread_set_name("demux");
2562 pthread_mutex_lock(&in->lock);
2563
2564 stats_register_thread_cputime(in->stats, "thread");
2565
2566 while (!in->thread_terminate) {
2567 if (thread_work(in))
2568 continue;
2569 pthread_cond_signal(&in->wakeup);
2570 struct timespec until = mp_time_us_to_timespec(in->next_cache_update);
2571 pthread_cond_timedwait(&in->wakeup, &in->lock, &until);
2572 }
2573
2574 if (in->shutdown_async) {
2575 pthread_mutex_unlock(&in->lock);
2576 demux_shutdown(in);
2577 pthread_mutex_lock(&in->lock);
2578 in->shutdown_async = false;
2579 if (in->wakeup_cb)
2580 in->wakeup_cb(in->wakeup_cb_ctx);
2581 }
2582
2583 stats_unregister_thread(in->stats, "thread");
2584
2585 pthread_mutex_unlock(&in->lock);
2586 return NULL;
2587 }
2588
2589 // Low-level part of dequeueing a packet.
advance_reader_head(struct demux_stream * ds)2590 static struct demux_packet *advance_reader_head(struct demux_stream *ds)
2591 {
2592 struct demux_packet *pkt = ds->reader_head;
2593 if (!pkt)
2594 return NULL;
2595
2596 ds->reader_head = pkt->next;
2597
2598 ds->last_ret_pos = pkt->pos;
2599 ds->last_ret_dts = pkt->dts;
2600
2601 return pkt;
2602 }
2603
2604 // Return a newly allocated new packet. The pkt parameter may be either a
2605 // in-memory packet (then a new reference is made), or a reference to
2606 // packet in the disk cache (then the packet is read from disk).
read_packet_from_cache(struct demux_internal * in,struct demux_packet * pkt)2607 static struct demux_packet *read_packet_from_cache(struct demux_internal *in,
2608 struct demux_packet *pkt)
2609 {
2610 if (!pkt)
2611 return NULL;
2612
2613 if (pkt->is_cached) {
2614 assert(in->cache);
2615 struct demux_packet *meta = pkt;
2616 pkt = demux_cache_read(in->cache, pkt->cached_data.pos);
2617 if (pkt) {
2618 demux_packet_copy_attribs(pkt, meta);
2619 } else {
2620 MP_ERR(in, "Failed to retrieve packet from cache.\n");
2621 }
2622 } else {
2623 // The returned packet is mutated etc. and will be owned by the user.
2624 pkt = demux_copy_packet(pkt);
2625 }
2626
2627 return pkt;
2628 }
2629
2630 // Returns:
2631 // < 0: EOF was reached, *res is not set
2632 // == 0: no new packet yet, wait, *res is not set
2633 // > 0: new packet is moved to *res
dequeue_packet(struct demux_stream * ds,double min_pts,struct demux_packet ** res)2634 static int dequeue_packet(struct demux_stream *ds, double min_pts,
2635 struct demux_packet **res)
2636 {
2637 struct demux_internal *in = ds->in;
2638
2639 if (!ds->selected)
2640 return -1;
2641 if (in->blocked)
2642 return 0;
2643
2644 if (ds->sh->attached_picture) {
2645 ds->eof = true;
2646 if (ds->attached_picture_added)
2647 return -1;
2648 ds->attached_picture_added = true;
2649 struct demux_packet *pkt = demux_copy_packet(ds->sh->attached_picture);
2650 if (!pkt)
2651 abort();
2652 pkt->stream = ds->sh->index;
2653 *res = pkt;
2654 return 1;
2655 }
2656
2657 if (!in->reading && (!in->eof || in->opts->force_retry_eof)) {
2658 in->reading = true; // enable demuxer thread prefetching
2659 pthread_cond_signal(&in->wakeup);
2660 }
2661
2662 ds->force_read_until = min_pts;
2663
2664 if (ds->back_resuming || ds->back_restarting) {
2665 assert(in->back_demuxing);
2666 return 0;
2667 }
2668
2669 bool eof = !ds->reader_head && ds->eof;
2670
2671 if (in->back_demuxing) {
2672 // Subtitles not supported => EOF.
2673 if (!ds->eager)
2674 return -1;
2675
2676 // Next keyframe (or EOF) was reached => step back.
2677 if (ds->back_range_started && !ds->back_range_count &&
2678 ((ds->reader_head && ds->reader_head->keyframe) || eof))
2679 {
2680 ds->back_restarting = true;
2681 ds->back_restart_eof = false;
2682 ds->back_restart_next = false;
2683
2684 find_backward_restart_pos(ds);
2685
2686 if (ds->back_restarting)
2687 return 0;
2688 }
2689
2690 eof = ds->back_range_count < 0;
2691 }
2692
2693 ds->need_wakeup = !ds->reader_head;
2694 if (!ds->reader_head || eof) {
2695 if (!ds->eager) {
2696 // Non-eager streams temporarily return EOF. If they returned 0,
2697 // the reader would have to wait for new packets, which does not
2698 // make sense due to the sparseness and passiveness of non-eager
2699 // streams.
2700 // Unless the min_pts feature is used: then EOF is only signaled
2701 // if read-ahead went above min_pts.
2702 if (!lazy_stream_needs_wait(ds))
2703 ds->eof = eof = true;
2704 }
2705 return eof ? -1 : 0;
2706 }
2707
2708 struct demux_packet *pkt = advance_reader_head(ds);
2709 assert(pkt);
2710 pkt = read_packet_from_cache(in, pkt);
2711 if (!pkt)
2712 return 0;
2713
2714 if (in->back_demuxing) {
2715 if (pkt->keyframe) {
2716 assert(ds->back_range_count > 0);
2717 ds->back_range_count -= 1;
2718 if (ds->back_range_preroll >= 0)
2719 ds->back_range_preroll -= 1;
2720 }
2721
2722 if (ds->back_range_preroll >= 0)
2723 pkt->back_preroll = true;
2724
2725 if (!ds->back_range_started) {
2726 pkt->back_restart = true;
2727 ds->back_range_started = true;
2728 }
2729 }
2730
2731 double ts = MP_PTS_OR_DEF(pkt->dts, pkt->pts);
2732 if (ts != MP_NOPTS_VALUE)
2733 ds->base_ts = ts;
2734
2735 if (pkt->keyframe && ts != MP_NOPTS_VALUE) {
2736 // Update bitrate - only at keyframe points, because we use the
2737 // (possibly) reordered packet timestamps instead of realtime.
2738 double d = ts - ds->last_br_ts;
2739 if (ds->last_br_ts == MP_NOPTS_VALUE || d < 0) {
2740 ds->bitrate = -1;
2741 ds->last_br_ts = ts;
2742 ds->last_br_bytes = 0;
2743 } else if (d >= 0.5) { // a window of least 500ms for UI purposes
2744 ds->bitrate = ds->last_br_bytes / d;
2745 ds->last_br_ts = ts;
2746 ds->last_br_bytes = 0;
2747 }
2748 }
2749 ds->last_br_bytes += pkt->len;
2750
2751 // This implies this function is actually called from "the" user thread.
2752 if (pkt->pos >= in->d_user->filepos)
2753 in->d_user->filepos = pkt->pos;
2754 in->d_user->filesize = in->stream_size;
2755
2756 pkt->pts = MP_ADD_PTS(pkt->pts, in->ts_offset);
2757 pkt->dts = MP_ADD_PTS(pkt->dts, in->ts_offset);
2758
2759 if (pkt->segmented) {
2760 pkt->start = MP_ADD_PTS(pkt->start, in->ts_offset);
2761 pkt->end = MP_ADD_PTS(pkt->end, in->ts_offset);
2762 }
2763
2764 prune_old_packets(in);
2765 *res = pkt;
2766 return 1;
2767 }
2768
2769 // Poll the demuxer queue, and if there's a packet, return it. Otherwise, just
2770 // make the demuxer thread read packets for this stream, and if there's at
2771 // least one packet, call the wakeup callback.
2772 // This enables readahead if it wasn't yet (except for interleaved subtitles).
2773 // Returns:
2774 // < 0: EOF was reached, *out_pkt=NULL
2775 // == 0: no new packet yet, but maybe later, *out_pkt=NULL
2776 // > 0: new packet read, *out_pkt is set
2777 // Note: when reading interleaved subtitles, the demuxer won't try to forcibly
2778 // read ahead to get the next subtitle packet (as the next packet could be
2779 // minutes away). In this situation, this function will just return -1.
demux_read_packet_async(struct sh_stream * sh,struct demux_packet ** out_pkt)2780 int demux_read_packet_async(struct sh_stream *sh, struct demux_packet **out_pkt)
2781 {
2782 return demux_read_packet_async_until(sh, MP_NOPTS_VALUE, out_pkt);
2783 }
2784
2785 // Like demux_read_packet_async(). They are the same for min_pts==MP_NOPTS_VALUE.
2786 // If min_pts is set, and the stream is lazily read (eager=false, interleaved
2787 // subtitles), then return 0 until demuxing has reached min_pts, or the queue
2788 // overflowed, or EOF was reached, or a packet was read for this stream.
demux_read_packet_async_until(struct sh_stream * sh,double min_pts,struct demux_packet ** out_pkt)2789 int demux_read_packet_async_until(struct sh_stream *sh, double min_pts,
2790 struct demux_packet **out_pkt)
2791 {
2792 struct demux_stream *ds = sh ? sh->ds : NULL;
2793 *out_pkt = NULL;
2794 if (!ds)
2795 return -1;
2796 struct demux_internal *in = ds->in;
2797
2798 pthread_mutex_lock(&in->lock);
2799 int r = -1;
2800 while (1) {
2801 r = dequeue_packet(ds, min_pts, out_pkt);
2802 if (in->threading || in->blocked || r != 0)
2803 break;
2804 // Needs to actually read packets until we got a packet or EOF.
2805 thread_work(in);
2806 }
2807 pthread_mutex_unlock(&in->lock);
2808 return r;
2809 }
2810
2811 // Read and return any packet we find. NULL means EOF.
2812 // Does not work with threading (don't call demux_start_thread()).
demux_read_any_packet(struct demuxer * demuxer)2813 struct demux_packet *demux_read_any_packet(struct demuxer *demuxer)
2814 {
2815 struct demux_internal *in = demuxer->in;
2816 pthread_mutex_lock(&in->lock);
2817 assert(!in->threading); // doesn't work with threading
2818 struct demux_packet *out_pkt = NULL;
2819 bool read_more = true;
2820 while (read_more && !in->blocked) {
2821 bool all_eof = true;
2822 for (int n = 0; n < in->num_streams; n++) {
2823 int r = dequeue_packet(in->streams[n]->ds, MP_NOPTS_VALUE, &out_pkt);
2824 if (r > 0)
2825 goto done;
2826 if (r == 0)
2827 all_eof = false;
2828 }
2829 // retry after calling this
2830 read_more = thread_work(in);
2831 read_more &= !all_eof;
2832 }
2833 done:
2834 pthread_mutex_unlock(&in->lock);
2835 return out_pkt;
2836 }
2837
demuxer_help(struct mp_log * log,const m_option_t * opt,struct bstr name)2838 int demuxer_help(struct mp_log *log, const m_option_t *opt, struct bstr name)
2839 {
2840 int i;
2841
2842 mp_info(log, "Available demuxers:\n");
2843 mp_info(log, " demuxer: info:\n");
2844 for (i = 0; demuxer_list[i]; i++) {
2845 mp_info(log, "%10s %s\n",
2846 demuxer_list[i]->name, demuxer_list[i]->desc);
2847 }
2848 mp_info(log, "\n");
2849
2850 return M_OPT_EXIT;
2851 }
2852
d_level(enum demux_check level)2853 static const char *d_level(enum demux_check level)
2854 {
2855 switch (level) {
2856 case DEMUX_CHECK_FORCE: return "force";
2857 case DEMUX_CHECK_UNSAFE: return "unsafe";
2858 case DEMUX_CHECK_REQUEST:return "request";
2859 case DEMUX_CHECK_NORMAL: return "normal";
2860 }
2861 abort();
2862 }
2863
decode_float(char * str,float * out)2864 static int decode_float(char *str, float *out)
2865 {
2866 char *rest;
2867 float dec_val;
2868
2869 dec_val = strtod(str, &rest);
2870 if (!rest || (rest == str) || !isfinite(dec_val))
2871 return -1;
2872
2873 *out = dec_val;
2874 return 0;
2875 }
2876
decode_gain(struct mp_log * log,struct mp_tags * tags,const char * tag,float * out)2877 static int decode_gain(struct mp_log *log, struct mp_tags *tags,
2878 const char *tag, float *out)
2879 {
2880 char *tag_val = NULL;
2881 float dec_val;
2882
2883 tag_val = mp_tags_get_str(tags, tag);
2884 if (!tag_val)
2885 return -1;
2886
2887 if (decode_float(tag_val, &dec_val) < 0) {
2888 mp_msg(log, MSGL_ERR, "Invalid replaygain value\n");
2889 return -1;
2890 }
2891
2892 *out = dec_val;
2893 return 0;
2894 }
2895
decode_peak(struct mp_log * log,struct mp_tags * tags,const char * tag,float * out)2896 static int decode_peak(struct mp_log *log, struct mp_tags *tags,
2897 const char *tag, float *out)
2898 {
2899 char *tag_val = NULL;
2900 float dec_val;
2901
2902 *out = 1.0;
2903
2904 tag_val = mp_tags_get_str(tags, tag);
2905 if (!tag_val)
2906 return 0;
2907
2908 if (decode_float(tag_val, &dec_val) < 0 || dec_val <= 0.0)
2909 return -1;
2910
2911 *out = dec_val;
2912 return 0;
2913 }
2914
decode_rgain(struct mp_log * log,struct mp_tags * tags)2915 static struct replaygain_data *decode_rgain(struct mp_log *log,
2916 struct mp_tags *tags)
2917 {
2918 struct replaygain_data rg = {0};
2919
2920 // Set values in *rg, using track gain as a fallback for album gain if the
2921 // latter is not present. This behavior matches that in demux/demux_lavf.c's
2922 // export_replaygain; if you change this, please make equivalent changes
2923 // there too.
2924 if (decode_gain(log, tags, "REPLAYGAIN_TRACK_GAIN", &rg.track_gain) >= 0 &&
2925 decode_peak(log, tags, "REPLAYGAIN_TRACK_PEAK", &rg.track_peak) >= 0)
2926 {
2927 if (decode_gain(log, tags, "REPLAYGAIN_ALBUM_GAIN", &rg.album_gain) < 0 ||
2928 decode_peak(log, tags, "REPLAYGAIN_ALBUM_PEAK", &rg.album_peak) < 0)
2929 {
2930 // Album gain is undefined; fall back to track gain.
2931 rg.album_gain = rg.track_gain;
2932 rg.album_peak = rg.track_peak;
2933 }
2934 return talloc_dup(NULL, &rg);
2935 }
2936
2937 if (decode_gain(log, tags, "REPLAYGAIN_GAIN", &rg.track_gain) >= 0 &&
2938 decode_peak(log, tags, "REPLAYGAIN_PEAK", &rg.track_peak) >= 0)
2939 {
2940 rg.album_gain = rg.track_gain;
2941 rg.album_peak = rg.track_peak;
2942 return talloc_dup(NULL, &rg);
2943 }
2944
2945 return NULL;
2946 }
2947
demux_update_replaygain(demuxer_t * demuxer)2948 static void demux_update_replaygain(demuxer_t *demuxer)
2949 {
2950 struct demux_internal *in = demuxer->in;
2951 for (int n = 0; n < in->num_streams; n++) {
2952 struct sh_stream *sh = in->streams[n];
2953 if (sh->type == STREAM_AUDIO && !sh->codec->replaygain_data) {
2954 struct replaygain_data *rg = decode_rgain(demuxer->log, sh->tags);
2955 if (!rg)
2956 rg = decode_rgain(demuxer->log, demuxer->metadata);
2957 if (rg)
2958 sh->codec->replaygain_data = talloc_steal(in, rg);
2959 }
2960 }
2961 }
2962
2963 // Copy some fields from src to dst (for initialization).
demux_copy(struct demuxer * dst,struct demuxer * src)2964 static void demux_copy(struct demuxer *dst, struct demuxer *src)
2965 {
2966 // Note that we do as shallow copies as possible. We expect the data
2967 // that is not-copied (only referenced) to be immutable.
2968 // This implies e.g. that no chapters are added after initialization.
2969 dst->chapters = src->chapters;
2970 dst->num_chapters = src->num_chapters;
2971 dst->editions = src->editions;
2972 dst->num_editions = src->num_editions;
2973 dst->edition = src->edition;
2974 dst->attachments = src->attachments;
2975 dst->num_attachments = src->num_attachments;
2976 dst->matroska_data = src->matroska_data;
2977 dst->playlist = src->playlist;
2978 dst->seekable = src->seekable;
2979 dst->partially_seekable = src->partially_seekable;
2980 dst->filetype = src->filetype;
2981 dst->ts_resets_possible = src->ts_resets_possible;
2982 dst->fully_read = src->fully_read;
2983 dst->start_time = src->start_time;
2984 dst->duration = src->duration;
2985 dst->is_network = src->is_network;
2986 dst->is_streaming = src->is_streaming;
2987 dst->stream_origin = src->stream_origin;
2988 dst->priv = src->priv;
2989 dst->metadata = mp_tags_dup(dst, src->metadata);
2990 }
2991
2992 // Update metadata after initialization. If sh==NULL, it's global metadata,
2993 // otherwise it's bound to the stream. If pts==NOPTS, use the highest known pts
2994 // in the stream. Caller retains ownership of tags ptr. Called locked.
add_timed_metadata(struct demux_internal * in,struct mp_tags * tags,struct sh_stream * sh,double pts)2995 static void add_timed_metadata(struct demux_internal *in, struct mp_tags *tags,
2996 struct sh_stream *sh, double pts)
2997 {
2998 struct demux_cached_range *r = in->current_range;
2999 if (!r)
3000 return;
3001
3002 // We don't expect this, nor do we find it useful.
3003 if (sh && sh != in->metadata_stream)
3004 return;
3005
3006 if (pts == MP_NOPTS_VALUE) {
3007 for (int n = 0; n < r->num_streams; n++)
3008 pts = MP_PTS_MAX(pts, r->streams[n]->last_ts);
3009
3010 // Tends to happen when doing the initial icy update.
3011 if (pts == MP_NOPTS_VALUE)
3012 pts = in->d_thread->start_time;
3013 }
3014
3015 struct timed_metadata *tm = talloc_zero(NULL, struct timed_metadata);
3016 *tm = (struct timed_metadata){
3017 .pts = pts,
3018 .tags = mp_tags_dup(tm, tags),
3019 .from_stream = !!sh,
3020 };
3021 MP_TARRAY_APPEND(r, r->metadata, r->num_metadata, tm);
3022 }
3023
3024 // This is called by demuxer implementations if sh->tags changed. Note that
3025 // sh->tags itself is never actually changed (it's immutable, because sh->tags
3026 // can be accessed by the playback thread, and there is no synchronization).
3027 // pts is the time at/after which the metadata becomes effective. You're
3028 // supposed to call this ordered by time, and only while a packet is being
3029 // read.
3030 // Ownership of tags goes to the function.
demux_stream_tags_changed(struct demuxer * demuxer,struct sh_stream * sh,struct mp_tags * tags,double pts)3031 void demux_stream_tags_changed(struct demuxer *demuxer, struct sh_stream *sh,
3032 struct mp_tags *tags, double pts)
3033 {
3034 struct demux_internal *in = demuxer->in;
3035 assert(demuxer == in->d_thread);
3036 struct demux_stream *ds = sh ? sh->ds : NULL;
3037 assert(!sh || ds); // stream must have been added
3038
3039 pthread_mutex_lock(&in->lock);
3040
3041 if (pts == MP_NOPTS_VALUE) {
3042 MP_WARN(in, "Discarding timed metadata without timestamp.\n");
3043 } else {
3044 add_timed_metadata(in, tags, sh, pts);
3045 }
3046 talloc_free(tags);
3047
3048 pthread_mutex_unlock(&in->lock);
3049 }
3050
3051 // This is called by demuxer implementations if demuxer->metadata changed.
3052 // (It will be propagated to the user as timed metadata.)
demux_metadata_changed(demuxer_t * demuxer)3053 void demux_metadata_changed(demuxer_t *demuxer)
3054 {
3055 assert(demuxer == demuxer->in->d_thread); // call from demuxer impl. only
3056 struct demux_internal *in = demuxer->in;
3057
3058 pthread_mutex_lock(&in->lock);
3059 add_timed_metadata(in, demuxer->metadata, NULL, MP_NOPTS_VALUE);
3060 pthread_mutex_unlock(&in->lock);
3061 }
3062
3063 // Called locked, with user demuxer.
update_final_metadata(demuxer_t * demuxer,struct timed_metadata * tm)3064 static void update_final_metadata(demuxer_t *demuxer, struct timed_metadata *tm)
3065 {
3066 assert(demuxer == demuxer->in->d_user);
3067 struct demux_internal *in = demuxer->in;
3068
3069 struct mp_tags *dyn_tags = NULL;
3070
3071 // Often useful for audio-only files, which have metadata in the audio track
3072 // metadata instead of the main metadata, but can also have cover art
3073 // metadata (which libavformat likes to treat as video streams).
3074 int astreams = 0;
3075 int astream_id = -1;
3076 int vstreams = 0;
3077 for (int n = 0; n < in->num_streams; n++) {
3078 struct sh_stream *sh = in->streams[n];
3079 if (sh->type == STREAM_VIDEO && !sh->attached_picture)
3080 vstreams += 1;
3081 if (sh->type == STREAM_AUDIO) {
3082 astreams += 1;
3083 astream_id = n;
3084 }
3085 }
3086
3087 // Use the metadata_stream tags only if this really seems to be an audio-
3088 // only stream. Otherwise it will happen too often that "uninteresting"
3089 // stream metadata will trash the actual file tags.
3090 if (vstreams == 0 && astreams == 1 &&
3091 in->streams[astream_id] == in->metadata_stream)
3092 {
3093 dyn_tags = in->metadata_stream->tags;
3094 if (tm && tm->from_stream)
3095 dyn_tags = tm->tags;
3096 }
3097
3098 // Global metadata updates.
3099 if (tm && !tm->from_stream)
3100 dyn_tags = tm->tags;
3101
3102 if (dyn_tags)
3103 mp_tags_merge(demuxer->metadata, dyn_tags);
3104 }
3105
lookup_timed_metadata(struct demux_internal * in,double pts)3106 static struct timed_metadata *lookup_timed_metadata(struct demux_internal *in,
3107 double pts)
3108 {
3109 struct demux_cached_range *r = in->current_range;
3110
3111 if (!r || !r->num_metadata || pts == MP_NOPTS_VALUE)
3112 return NULL;
3113
3114 int start = 1;
3115 int i = in->cached_metadata_index;
3116 if (i >= 0 && i < r->num_metadata && r->metadata[i]->pts <= pts)
3117 start = i + 1;
3118
3119 in->cached_metadata_index = r->num_metadata - 1;
3120 for (int n = start; n < r->num_metadata; n++) {
3121 if (r->metadata[n]->pts >= pts) {
3122 in->cached_metadata_index = n - 1;
3123 break;
3124 }
3125 }
3126
3127 return r->metadata[in->cached_metadata_index];
3128 }
3129
3130 // Called by the user thread (i.e. player) to update metadata and other things
3131 // from the demuxer thread.
3132 // The pts parameter is the current playback position.
demux_update(demuxer_t * demuxer,double pts)3133 void demux_update(demuxer_t *demuxer, double pts)
3134 {
3135 assert(demuxer == demuxer->in->d_user);
3136 struct demux_internal *in = demuxer->in;
3137
3138 pthread_mutex_lock(&in->lock);
3139
3140 if (!in->threading)
3141 update_cache(in);
3142
3143 // This implies this function is actually called from "the" user thread.
3144 in->d_user->filesize = in->stream_size;
3145
3146 pts = MP_ADD_PTS(pts, -in->ts_offset);
3147
3148 struct timed_metadata *prev = lookup_timed_metadata(in, in->last_playback_pts);
3149 struct timed_metadata *cur = lookup_timed_metadata(in, pts);
3150 if (prev != cur || in->force_metadata_update) {
3151 in->force_metadata_update = false;
3152 update_final_metadata(demuxer, cur);
3153 demuxer->events |= DEMUX_EVENT_METADATA;
3154 }
3155
3156 in->last_playback_pts = pts;
3157
3158 demuxer->events |= in->events;
3159 in->events = 0;
3160 if (demuxer->events & (DEMUX_EVENT_METADATA | DEMUX_EVENT_STREAMS))
3161 demux_update_replaygain(demuxer);
3162 if (demuxer->events & DEMUX_EVENT_DURATION)
3163 demuxer->duration = in->duration;
3164
3165 pthread_mutex_unlock(&in->lock);
3166 }
3167
demux_init_cuesheet(struct demuxer * demuxer)3168 static void demux_init_cuesheet(struct demuxer *demuxer)
3169 {
3170 char *cue = mp_tags_get_str(demuxer->metadata, "cuesheet");
3171 if (cue && !demuxer->num_chapters) {
3172 struct cue_file *f = mp_parse_cue(bstr0(cue));
3173 if (f) {
3174 if (mp_check_embedded_cue(f) < 0) {
3175 MP_WARN(demuxer, "Embedded cue sheet references more than one file. "
3176 "Ignoring it.\n");
3177 } else {
3178 for (int n = 0; n < f->num_tracks; n++) {
3179 struct cue_track *t = &f->tracks[n];
3180 int idx = demuxer_add_chapter(demuxer, "", t->start, -1);
3181 mp_tags_merge(demuxer->chapters[idx].metadata, t->tags);
3182 }
3183 }
3184 }
3185 talloc_free(f);
3186 }
3187 }
3188
3189 // A demuxer can use this during opening if all data was read from the stream.
3190 // Calling this after opening was completed is not allowed. Also, if opening
3191 // failed, this must not be called (or trying another demuxer would fail).
3192 // Useful so that e.g. subtitles don't keep the file or socket open.
3193 // If there's ever the situation where we can't allow the demuxer to close
3194 // the stream, this function could ignore the request.
demux_close_stream(struct demuxer * demuxer)3195 void demux_close_stream(struct demuxer *demuxer)
3196 {
3197 struct demux_internal *in = demuxer->in;
3198 assert(!in->threading && demuxer == in->d_thread);
3199
3200 if (!demuxer->stream || !in->owns_stream)
3201 return;
3202
3203 MP_VERBOSE(demuxer, "demuxer read all data; closing stream\n");
3204 free_stream(demuxer->stream);
3205 demuxer->stream = NULL;
3206 in->d_user->stream = NULL;
3207 }
3208
demux_init_ccs(struct demuxer * demuxer,struct demux_opts * opts)3209 static void demux_init_ccs(struct demuxer *demuxer, struct demux_opts *opts)
3210 {
3211 struct demux_internal *in = demuxer->in;
3212 if (!opts->create_ccs)
3213 return;
3214 pthread_mutex_lock(&in->lock);
3215 for (int n = 0; n < in->num_streams; n++) {
3216 struct sh_stream *sh = in->streams[n];
3217 if (sh->type == STREAM_VIDEO && !sh->attached_picture)
3218 demuxer_get_cc_track_locked(sh);
3219 }
3220 pthread_mutex_unlock(&in->lock);
3221 }
3222
3223 // Return whether "heavy" caching on this stream is enabled. By default, this
3224 // corresponds to whether the source stream is considered in the network. The
3225 // only effect should be adjusting display behavior (of cache stats etc.), and
3226 // possibly switching between which set of options influence cache settings.
demux_is_network_cached(demuxer_t * demuxer)3227 bool demux_is_network_cached(demuxer_t *demuxer)
3228 {
3229 struct demux_internal *in = demuxer->in;
3230 pthread_mutex_lock(&in->lock);
3231 bool r = in->using_network_cache_opts;
3232 pthread_mutex_unlock(&in->lock);
3233 return r;
3234 }
3235
3236 struct parent_stream_info {
3237 bool seekable;
3238 bool is_network;
3239 bool is_streaming;
3240 int stream_origin;
3241 struct mp_cancel *cancel;
3242 char *filename;
3243 };
3244
open_given_type(struct mpv_global * global,struct mp_log * log,const struct demuxer_desc * desc,struct stream * stream,struct parent_stream_info * sinfo,struct demuxer_params * params,enum demux_check check)3245 static struct demuxer *open_given_type(struct mpv_global *global,
3246 struct mp_log *log,
3247 const struct demuxer_desc *desc,
3248 struct stream *stream,
3249 struct parent_stream_info *sinfo,
3250 struct demuxer_params *params,
3251 enum demux_check check)
3252 {
3253 if (mp_cancel_test(sinfo->cancel))
3254 return NULL;
3255
3256 struct demuxer *demuxer = talloc_ptrtype(NULL, demuxer);
3257 struct m_config_cache *opts_cache =
3258 m_config_cache_alloc(demuxer, global, &demux_conf);
3259 struct demux_opts *opts = opts_cache->opts;
3260 *demuxer = (struct demuxer) {
3261 .desc = desc,
3262 .stream = stream,
3263 .cancel = sinfo->cancel,
3264 .seekable = sinfo->seekable,
3265 .filepos = -1,
3266 .global = global,
3267 .log = mp_log_new(demuxer, log, desc->name),
3268 .glog = log,
3269 .filename = talloc_strdup(demuxer, sinfo->filename),
3270 .is_network = sinfo->is_network,
3271 .is_streaming = sinfo->is_streaming,
3272 .stream_origin = sinfo->stream_origin,
3273 .access_references = opts->access_references,
3274 .events = DEMUX_EVENT_ALL,
3275 .duration = -1,
3276 };
3277
3278 struct demux_internal *in = demuxer->in = talloc_ptrtype(demuxer, in);
3279 *in = (struct demux_internal){
3280 .global = global,
3281 .log = demuxer->log,
3282 .stats = stats_ctx_create(in, global, "demuxer"),
3283 .can_cache = params && params->is_top_level,
3284 .can_record = params && params->stream_record,
3285 .opts = opts,
3286 .opts_cache = opts_cache,
3287 .d_thread = talloc(demuxer, struct demuxer),
3288 .d_user = demuxer,
3289 .after_seek = true, // (assumed identical to initial demuxer state)
3290 .after_seek_to_start = true,
3291 .highest_av_pts = MP_NOPTS_VALUE,
3292 .seeking_in_progress = MP_NOPTS_VALUE,
3293 .demux_ts = MP_NOPTS_VALUE,
3294 .owns_stream = !params->external_stream,
3295 };
3296 pthread_mutex_init(&in->lock, NULL);
3297 pthread_cond_init(&in->wakeup, NULL);
3298
3299 *in->d_thread = *demuxer;
3300
3301 in->d_thread->metadata = talloc_zero(in->d_thread, struct mp_tags);
3302
3303 mp_dbg(log, "Trying demuxer: %s (force-level: %s)\n",
3304 desc->name, d_level(check));
3305
3306 if (stream)
3307 stream_seek(stream, 0);
3308
3309 in->d_thread->params = params; // temporary during open()
3310 int ret = demuxer->desc->open(in->d_thread, check);
3311 if (ret >= 0) {
3312 in->d_thread->params = NULL;
3313 if (in->d_thread->filetype)
3314 mp_verbose(log, "Detected file format: %s (%s)\n",
3315 in->d_thread->filetype, desc->desc);
3316 else
3317 mp_verbose(log, "Detected file format: %s\n", desc->desc);
3318 if (!in->d_thread->seekable)
3319 mp_verbose(log, "Stream is not seekable.\n");
3320 if (!in->d_thread->seekable && opts->force_seekable) {
3321 mp_warn(log, "Not seekable, but enabling seeking on user request.\n");
3322 in->d_thread->seekable = true;
3323 in->d_thread->partially_seekable = true;
3324 }
3325 demux_init_cuesheet(in->d_thread);
3326 demux_init_ccs(demuxer, opts);
3327 demux_convert_tags_charset(in->d_thread);
3328 demux_copy(in->d_user, in->d_thread);
3329 in->duration = in->d_thread->duration;
3330 demuxer_sort_chapters(demuxer);
3331 in->events = DEMUX_EVENT_ALL;
3332
3333 struct demuxer *sub = NULL;
3334 if (!(params && params->disable_timeline)) {
3335 struct timeline *tl = timeline_load(global, log, demuxer);
3336 if (tl) {
3337 struct demuxer_params params2 = {0};
3338 params2.timeline = tl;
3339 params2.is_top_level = params && params->is_top_level;
3340 params2.stream_record = params && params->stream_record;
3341 sub =
3342 open_given_type(global, log, &demuxer_desc_timeline,
3343 NULL, sinfo, ¶ms2, DEMUX_CHECK_FORCE);
3344 if (sub) {
3345 in->can_cache = false;
3346 in->can_record = false;
3347 } else {
3348 timeline_destroy(tl);
3349 }
3350 }
3351 }
3352
3353 switch_to_fresh_cache_range(in);
3354
3355 update_opts(in);
3356
3357 demux_update(demuxer, MP_NOPTS_VALUE);
3358
3359 demuxer = sub ? sub : demuxer;
3360 return demuxer;
3361 }
3362
3363 demuxer->stream = NULL;
3364 demux_free(demuxer);
3365 return NULL;
3366 }
3367
3368 static const int d_normal[] = {DEMUX_CHECK_NORMAL, DEMUX_CHECK_UNSAFE, -1};
3369 static const int d_request[] = {DEMUX_CHECK_REQUEST, -1};
3370 static const int d_force[] = {DEMUX_CHECK_FORCE, -1};
3371
3372 // params can be NULL
3373 // This may free the stream parameter on success.
demux_open(struct stream * stream,struct mp_cancel * cancel,struct demuxer_params * params,struct mpv_global * global)3374 static struct demuxer *demux_open(struct stream *stream,
3375 struct mp_cancel *cancel,
3376 struct demuxer_params *params,
3377 struct mpv_global *global)
3378 {
3379 const int *check_levels = d_normal;
3380 const struct demuxer_desc *check_desc = NULL;
3381 struct mp_log *log = mp_log_new(NULL, global->log, "!demux");
3382 struct demuxer *demuxer = NULL;
3383 char *force_format = params ? params->force_format : NULL;
3384
3385 if (!force_format)
3386 force_format = stream->demuxer;
3387
3388 if (force_format && force_format[0] && !stream->is_directory) {
3389 check_levels = d_request;
3390 if (force_format[0] == '+') {
3391 force_format += 1;
3392 check_levels = d_force;
3393 }
3394 for (int n = 0; demuxer_list[n]; n++) {
3395 if (strcmp(demuxer_list[n]->name, force_format) == 0)
3396 check_desc = demuxer_list[n];
3397 }
3398 if (!check_desc) {
3399 mp_err(log, "Demuxer %s does not exist.\n", force_format);
3400 goto done;
3401 }
3402 }
3403
3404 struct parent_stream_info sinfo = {
3405 .seekable = stream->seekable,
3406 .is_network = stream->is_network,
3407 .is_streaming = stream->streaming,
3408 .stream_origin = stream->stream_origin,
3409 .cancel = cancel,
3410 .filename = talloc_strdup(NULL, stream->url),
3411 };
3412
3413 // Test demuxers from first to last, one pass for each check_levels[] entry
3414 for (int pass = 0; check_levels[pass] != -1; pass++) {
3415 enum demux_check level = check_levels[pass];
3416 mp_verbose(log, "Trying demuxers for level=%s.\n", d_level(level));
3417 for (int n = 0; demuxer_list[n]; n++) {
3418 const struct demuxer_desc *desc = demuxer_list[n];
3419 if (!check_desc || desc == check_desc) {
3420 demuxer = open_given_type(global, log, desc, stream, &sinfo,
3421 params, level);
3422 if (demuxer) {
3423 talloc_steal(demuxer, log);
3424 log = NULL;
3425 goto done;
3426 }
3427 }
3428 }
3429 }
3430
3431 done:
3432 talloc_free(sinfo.filename);
3433 talloc_free(log);
3434 return demuxer;
3435 }
3436
create_webshit_concat_stream(struct mpv_global * global,struct mp_cancel * c,bstr init,struct stream * real)3437 static struct stream *create_webshit_concat_stream(struct mpv_global *global,
3438 struct mp_cancel *c,
3439 bstr init, struct stream *real)
3440 {
3441 struct stream *mem = stream_memory_open(global, init.start, init.len);
3442 assert(mem);
3443
3444 struct stream *streams[2] = {mem, real};
3445 struct stream *concat = stream_concat_open(global, c, streams, 2);
3446 if (!concat) {
3447 free_stream(mem);
3448 free_stream(real);
3449 }
3450 return concat;
3451 }
3452
3453 // Convenience function: open the stream, enable the cache (according to params
3454 // and global opts.), open the demuxer.
3455 // Also for some reason may close the opened stream if it's not needed.
3456 // demuxer->cancel is not the cancel parameter, but is its own object that will
3457 // be a slave (mp_cancel_set_parent()) to provided cancel object.
3458 // demuxer->cancel is automatically freed.
demux_open_url(const char * url,struct demuxer_params * params,struct mp_cancel * cancel,struct mpv_global * global)3459 struct demuxer *demux_open_url(const char *url,
3460 struct demuxer_params *params,
3461 struct mp_cancel *cancel,
3462 struct mpv_global *global)
3463 {
3464 if (!params)
3465 return NULL;
3466 struct mp_cancel *priv_cancel = mp_cancel_new(NULL);
3467 if (cancel)
3468 mp_cancel_set_parent(priv_cancel, cancel);
3469 struct stream *s = params->external_stream;
3470 if (!s) {
3471 s = stream_create(url, STREAM_READ | params->stream_flags,
3472 priv_cancel, global);
3473 if (s && params->init_fragment.len) {
3474 s = create_webshit_concat_stream(global, priv_cancel,
3475 params->init_fragment, s);
3476 }
3477 }
3478 if (!s) {
3479 talloc_free(priv_cancel);
3480 return NULL;
3481 }
3482 struct demuxer *d = demux_open(s, priv_cancel, params, global);
3483 if (d) {
3484 talloc_steal(d->in, priv_cancel);
3485 assert(d->cancel);
3486 } else {
3487 params->demuxer_failed = true;
3488 if (!params->external_stream)
3489 free_stream(s);
3490 talloc_free(priv_cancel);
3491 }
3492 return d;
3493 }
3494
3495 // clear the packet queues
demux_flush(demuxer_t * demuxer)3496 void demux_flush(demuxer_t *demuxer)
3497 {
3498 struct demux_internal *in = demuxer->in;
3499 assert(demuxer == in->d_user);
3500
3501 pthread_mutex_lock(&in->lock);
3502 clear_reader_state(in, true);
3503 for (int n = 0; n < in->num_ranges; n++)
3504 clear_cached_range(in, in->ranges[n]);
3505 free_empty_cached_ranges(in);
3506 for (int n = 0; n < in->num_streams; n++) {
3507 struct demux_stream *ds = in->streams[n]->ds;
3508 ds->refreshing = false;
3509 ds->eof = false;
3510 }
3511 in->eof = false;
3512 in->seeking = false;
3513 pthread_mutex_unlock(&in->lock);
3514 }
3515
3516 // Does some (but not all) things for switching to another range.
switch_current_range(struct demux_internal * in,struct demux_cached_range * range)3517 static void switch_current_range(struct demux_internal *in,
3518 struct demux_cached_range *range)
3519 {
3520 struct demux_cached_range *old = in->current_range;
3521 assert(old != range);
3522
3523 set_current_range(in, range);
3524
3525 if (old) {
3526 // Remove packets which can't be used when seeking back to the range.
3527 for (int n = 0; n < in->num_streams; n++) {
3528 struct demux_queue *queue = old->streams[n];
3529
3530 // Remove all packets which cannot be involved in seeking.
3531 while (queue->head && !queue->head->keyframe)
3532 remove_head_packet(queue);
3533 }
3534
3535 // Exclude weird corner cases that break resuming.
3536 for (int n = 0; n < in->num_streams; n++) {
3537 struct demux_stream *ds = in->streams[n]->ds;
3538 // This is needed to resume or join the range at all.
3539 if (ds->selected && !(ds->global_correct_dts ||
3540 ds->global_correct_pos))
3541 {
3542 MP_VERBOSE(in, "discarding unseekable range due to stream %d\n", n);
3543 clear_cached_range(in, old);
3544 break;
3545 }
3546 }
3547 }
3548
3549 // Set up reading from new range (as well as writing to it).
3550 for (int n = 0; n < in->num_streams; n++) {
3551 struct demux_stream *ds = in->streams[n]->ds;
3552
3553 ds->queue = range->streams[n];
3554 ds->refreshing = false;
3555 ds->eof = false;
3556 }
3557
3558 // No point in keeping any junk (especially if old current_range is empty).
3559 free_empty_cached_ranges(in);
3560
3561 // The change detection doesn't work across ranges.
3562 in->force_metadata_update = true;
3563 }
3564
3565 // Search for the entry with the highest index with entry.pts <= pts true.
search_index(struct demux_queue * queue,double pts)3566 static struct demux_packet *search_index(struct demux_queue *queue, double pts)
3567 {
3568 size_t a = 0;
3569 size_t b = queue->num_index;
3570
3571 while (a < b) {
3572 size_t m = a + (b - a) / 2;
3573 struct index_entry *e = &QUEUE_INDEX_ENTRY(queue, m);
3574
3575 bool m_ok = e->pts <= pts;
3576
3577 if (a + 1 == b)
3578 return m_ok ? e->pkt : NULL;
3579
3580 if (m_ok) {
3581 a = m;
3582 } else {
3583 b = m;
3584 }
3585 }
3586
3587 return NULL;
3588 }
3589
find_seek_target(struct demux_queue * queue,double pts,int flags)3590 static struct demux_packet *find_seek_target(struct demux_queue *queue,
3591 double pts, int flags)
3592 {
3593 pts -= queue->ds->sh->seek_preroll;
3594
3595 struct demux_packet *start = search_index(queue, pts);
3596 if (!start)
3597 start = queue->head;
3598
3599 struct demux_packet *target = NULL;
3600 struct demux_packet *next = NULL;
3601 for (struct demux_packet *dp = start; dp; dp = next) {
3602 next = dp->next;
3603 if (!dp->keyframe)
3604 continue;
3605
3606 double range_pts;
3607 next = compute_keyframe_times(dp, &range_pts, NULL);
3608
3609 if (range_pts == MP_NOPTS_VALUE)
3610 continue;
3611
3612 if (flags & SEEK_FORWARD) {
3613 // Stop on the first packet that is >= pts.
3614 if (target)
3615 break;
3616 if (range_pts < pts)
3617 continue;
3618 } else {
3619 // Stop before the first packet that is > pts.
3620 // This still returns a packet with > pts if there's no better one.
3621 if (target && range_pts > pts)
3622 break;
3623 }
3624
3625 target = dp;
3626 }
3627
3628 return target;
3629 }
3630
3631 // Return a cache range for the given pts/flags, or NULL if none available.
3632 // must be called locked
find_cache_seek_range(struct demux_internal * in,double pts,int flags)3633 static struct demux_cached_range *find_cache_seek_range(struct demux_internal *in,
3634 double pts, int flags)
3635 {
3636 // Note about queued low level seeks: in->seeking can be true here, and it
3637 // might come from a previous resume seek to the current range. If we end
3638 // up seeking into the current range (i.e. just changing time offset), the
3639 // seek needs to continue. Otherwise, we override the queued seek anyway.
3640 if ((flags & SEEK_FACTOR) || !in->seekable_cache)
3641 return NULL;
3642
3643 struct demux_cached_range *res = NULL;
3644
3645 for (int n = 0; n < in->num_ranges; n++) {
3646 struct demux_cached_range *r = in->ranges[n];
3647 if (r->seek_start != MP_NOPTS_VALUE) {
3648 MP_VERBOSE(in, "cached range %d: %f <-> %f (bof=%d, eof=%d)\n",
3649 n, r->seek_start, r->seek_end, r->is_bof, r->is_eof);
3650
3651 if ((pts >= r->seek_start || r->is_bof) &&
3652 (pts <= r->seek_end || r->is_eof))
3653 {
3654 MP_VERBOSE(in, "...using this range for in-cache seek.\n");
3655 res = r;
3656 break;
3657 }
3658 }
3659 }
3660
3661 return res;
3662 }
3663
3664 // Adjust the seek target to the found video key frames. Otherwise the
3665 // video will undershoot the seek target, while audio will be closer to it.
3666 // The player frontend will play the additional video without audio, so
3667 // you get silent audio for the amount of "undershoot". Adjusting the seek
3668 // target will make the audio seek to the video target or before.
3669 // (If hr-seeks are used, it's better to skip this, as it would only mean
3670 // that more audio data than necessary would have to be decoded.)
adjust_cache_seek_target(struct demux_internal * in,struct demux_cached_range * range,double * pts,int * flags)3671 static void adjust_cache_seek_target(struct demux_internal *in,
3672 struct demux_cached_range *range,
3673 double *pts, int *flags)
3674 {
3675 if (*flags & SEEK_HR)
3676 return;
3677
3678 for (int n = 0; n < in->num_streams; n++) {
3679 struct demux_stream *ds = in->streams[n]->ds;
3680 struct demux_queue *queue = range->streams[n];
3681 if (ds->selected && ds->type == STREAM_VIDEO) {
3682 struct demux_packet *target = find_seek_target(queue, *pts, *flags);
3683 if (target) {
3684 double target_pts;
3685 compute_keyframe_times(target, &target_pts, NULL);
3686 if (target_pts != MP_NOPTS_VALUE) {
3687 MP_VERBOSE(in, "adjust seek target %f -> %f\n",
3688 *pts, target_pts);
3689 // (We assume the find_seek_target() call will return
3690 // the same target for the video stream.)
3691 *pts = target_pts;
3692 *flags &= ~SEEK_FORWARD;
3693 }
3694 }
3695 break;
3696 }
3697 }
3698 }
3699
3700 // must be called locked
3701 // range must be non-NULL and from find_cache_seek_range() using the same pts
3702 // and flags, before any other changes to the cached state
execute_cache_seek(struct demux_internal * in,struct demux_cached_range * range,double pts,int flags)3703 static void execute_cache_seek(struct demux_internal *in,
3704 struct demux_cached_range *range,
3705 double pts, int flags)
3706 {
3707 adjust_cache_seek_target(in, range, &pts, &flags);
3708
3709 for (int n = 0; n < in->num_streams; n++) {
3710 struct demux_stream *ds = in->streams[n]->ds;
3711 struct demux_queue *queue = range->streams[n];
3712
3713 struct demux_packet *target = find_seek_target(queue, pts, flags);
3714 ds->reader_head = target;
3715 ds->skip_to_keyframe = !target;
3716 if (ds->reader_head)
3717 ds->base_ts = MP_PTS_OR_DEF(ds->reader_head->pts, ds->reader_head->dts);
3718
3719 MP_VERBOSE(in, "seeking stream %d (%s) to ",
3720 n, stream_type_name(ds->type));
3721
3722 if (target) {
3723 MP_VERBOSE(in, "packet %f/%f\n", target->pts, target->dts);
3724 } else {
3725 MP_VERBOSE(in, "nothing\n");
3726 }
3727 }
3728
3729 // If we seek to another range, we want to seek the low level demuxer to
3730 // there as well, because reader and demuxer queue must be the same.
3731 if (in->current_range != range) {
3732 switch_current_range(in, range);
3733
3734 in->seeking = true;
3735 in->seek_flags = SEEK_HR;
3736 in->seek_pts = range->seek_end - 1.0;
3737
3738 // When new packets are being appended, they could overlap with the old
3739 // range due to demuxer seek imprecisions, or because the queue contains
3740 // packets past the seek target but before the next seek target. Don't
3741 // append them twice, instead skip them until new packets are found.
3742 for (int n = 0; n < in->num_streams; n++) {
3743 struct demux_stream *ds = in->streams[n]->ds;
3744
3745 ds->refreshing = ds->selected;
3746 }
3747
3748 MP_VERBOSE(in, "resuming demuxer to end of cached range\n");
3749 }
3750 }
3751
3752 // Create a new blank cache range, and backup the old one. If the seekable
3753 // demuxer cache is disabled, merely reset the current range to a blank state.
switch_to_fresh_cache_range(struct demux_internal * in)3754 static void switch_to_fresh_cache_range(struct demux_internal *in)
3755 {
3756 if (!in->seekable_cache && in->current_range) {
3757 clear_cached_range(in, in->current_range);
3758 return;
3759 }
3760
3761 struct demux_cached_range *range = talloc_ptrtype(NULL, range);
3762 *range = (struct demux_cached_range){
3763 .seek_start = MP_NOPTS_VALUE,
3764 .seek_end = MP_NOPTS_VALUE,
3765 };
3766 MP_TARRAY_APPEND(in, in->ranges, in->num_ranges, range);
3767 add_missing_streams(in, range);
3768
3769 switch_current_range(in, range);
3770 }
3771
demux_seek(demuxer_t * demuxer,double seek_pts,int flags)3772 int demux_seek(demuxer_t *demuxer, double seek_pts, int flags)
3773 {
3774 struct demux_internal *in = demuxer->in;
3775 assert(demuxer == in->d_user);
3776
3777 pthread_mutex_lock(&in->lock);
3778
3779 if (!(flags & SEEK_FACTOR))
3780 seek_pts = MP_ADD_PTS(seek_pts, -in->ts_offset);
3781
3782 int res = queue_seek(in, seek_pts, flags, true);
3783
3784 pthread_cond_signal(&in->wakeup);
3785 pthread_mutex_unlock(&in->lock);
3786
3787 return res;
3788 }
3789
queue_seek(struct demux_internal * in,double seek_pts,int flags,bool clear_back_state)3790 static bool queue_seek(struct demux_internal *in, double seek_pts, int flags,
3791 bool clear_back_state)
3792 {
3793 if (seek_pts == MP_NOPTS_VALUE)
3794 return false;
3795
3796 MP_VERBOSE(in, "queuing seek to %f%s\n", seek_pts,
3797 in->seeking ? " (cascade)" : "");
3798
3799 bool require_cache = flags & SEEK_CACHED;
3800 flags &= ~(unsigned)SEEK_CACHED;
3801
3802 bool set_backwards = flags & SEEK_SATAN;
3803 flags &= ~(unsigned)SEEK_SATAN;
3804
3805 bool force_seek = flags & SEEK_FORCE;
3806 flags &= ~(unsigned)SEEK_FORCE;
3807
3808 bool block = flags & SEEK_BLOCK;
3809 flags &= ~(unsigned)SEEK_BLOCK;
3810
3811 struct demux_cached_range *cache_target =
3812 find_cache_seek_range(in, seek_pts, flags);
3813
3814 if (!cache_target) {
3815 if (require_cache) {
3816 MP_VERBOSE(in, "Cached seek not possible.\n");
3817 return false;
3818 }
3819 if (!in->d_thread->seekable && !force_seek) {
3820 MP_WARN(in, "Cannot seek in this file.\n");
3821 return false;
3822 }
3823 }
3824
3825 in->eof = false;
3826 in->reading = false;
3827 in->back_demuxing = set_backwards;
3828
3829 clear_reader_state(in, clear_back_state);
3830
3831 in->blocked = block;
3832
3833 if (cache_target) {
3834 execute_cache_seek(in, cache_target, seek_pts, flags);
3835 } else {
3836 switch_to_fresh_cache_range(in);
3837
3838 in->seeking = true;
3839 in->seek_flags = flags;
3840 in->seek_pts = seek_pts;
3841 }
3842
3843 for (int n = 0; n < in->num_streams; n++) {
3844 struct demux_stream *ds = in->streams[n]->ds;
3845
3846 if (in->back_demuxing) {
3847 if (ds->back_seek_pos == MP_NOPTS_VALUE)
3848 ds->back_seek_pos = seek_pts;
3849 // Process possibly cached packets.
3850 back_demux_see_packets(in->streams[n]->ds);
3851 }
3852
3853 wakeup_ds(ds);
3854 }
3855
3856 if (!in->threading && in->seeking)
3857 execute_seek(in);
3858
3859 return true;
3860 }
3861
demuxer_stream_by_demuxer_id(struct demuxer * d,enum stream_type t,int id)3862 struct sh_stream *demuxer_stream_by_demuxer_id(struct demuxer *d,
3863 enum stream_type t, int id)
3864 {
3865 if (id < 0)
3866 return NULL;
3867 int num = demux_get_num_stream(d);
3868 for (int n = 0; n < num; n++) {
3869 struct sh_stream *s = demux_get_stream(d, n);
3870 if (s->type == t && s->demuxer_id == id)
3871 return s;
3872 }
3873 return NULL;
3874 }
3875
3876 // An obscure mechanism to get stream switching to be executed "faster" (as
3877 // perceived by the user), by making the stream return packets from the
3878 // current position
3879 // On a switch, it seeks back, and then grabs all packets that were
3880 // "missing" from the packet queue of the newly selected stream.
initiate_refresh_seek(struct demux_internal * in,struct demux_stream * stream,double start_ts)3881 static void initiate_refresh_seek(struct demux_internal *in,
3882 struct demux_stream *stream,
3883 double start_ts)
3884 {
3885 struct demuxer *demux = in->d_thread;
3886 bool seekable = demux->desc->seek && demux->seekable &&
3887 !demux->partially_seekable;
3888
3889 bool normal_seek = true;
3890 bool refresh_possible = true;
3891 for (int n = 0; n < in->num_streams; n++) {
3892 struct demux_stream *ds = in->streams[n]->ds;
3893
3894 if (!ds->selected)
3895 continue;
3896
3897 if (ds->type == STREAM_VIDEO || ds->type == STREAM_AUDIO)
3898 start_ts = MP_PTS_MIN(start_ts, ds->base_ts);
3899
3900 // If there were no other streams selected, we can use a normal seek.
3901 normal_seek &= stream == ds;
3902
3903 refresh_possible &= ds->queue->correct_dts || ds->queue->correct_pos;
3904 }
3905
3906 if (start_ts == MP_NOPTS_VALUE || !seekable)
3907 return;
3908
3909 if (!normal_seek) {
3910 if (!refresh_possible) {
3911 MP_VERBOSE(in, "can't issue refresh seek\n");
3912 return;
3913 }
3914
3915 for (int n = 0; n < in->num_streams; n++) {
3916 struct demux_stream *ds = in->streams[n]->ds;
3917
3918 bool correct_pos = ds->queue->correct_pos;
3919 bool correct_dts = ds->queue->correct_dts;
3920
3921 // We need to re-read all packets anyway, so discard the buffered
3922 // data. (In theory, we could keep the packets, and be able to use
3923 // it for seeking if partially read streams are deselected again,
3924 // but this causes other problems like queue overflows when
3925 // selecting a new stream.)
3926 ds_clear_reader_queue_state(ds);
3927 clear_queue(ds->queue);
3928
3929 // Streams which didn't have any packets yet will return all packets,
3930 // other streams return packets only starting from the last position.
3931 if (ds->selected && (ds->last_ret_pos != -1 ||
3932 ds->last_ret_dts != MP_NOPTS_VALUE))
3933 {
3934 ds->refreshing = true;
3935 ds->queue->correct_dts = correct_dts;
3936 ds->queue->correct_pos = correct_pos;
3937 ds->queue->last_pos = ds->last_ret_pos;
3938 ds->queue->last_dts = ds->last_ret_dts;
3939 }
3940
3941 update_seek_ranges(in->current_range);
3942 }
3943
3944 start_ts -= 1.0; // small offset to get correct overlap
3945 }
3946
3947 MP_VERBOSE(in, "refresh seek to %f\n", start_ts);
3948 in->seeking = true;
3949 in->seek_flags = SEEK_HR;
3950 in->seek_pts = start_ts;
3951 }
3952
3953 // Set whether the given stream should return packets.
3954 // ref_pts is used only if the stream is enabled. Then it serves as approximate
3955 // start pts for this stream (in the worst case it is ignored).
demuxer_select_track(struct demuxer * demuxer,struct sh_stream * stream,double ref_pts,bool selected)3956 void demuxer_select_track(struct demuxer *demuxer, struct sh_stream *stream,
3957 double ref_pts, bool selected)
3958 {
3959 struct demux_internal *in = demuxer->in;
3960 struct demux_stream *ds = stream->ds;
3961 pthread_mutex_lock(&in->lock);
3962 ref_pts = MP_ADD_PTS(ref_pts, -in->ts_offset);
3963 // don't flush buffers if stream is already selected / unselected
3964 if (ds->selected != selected) {
3965 MP_VERBOSE(in, "%sselect track %d\n", selected ? "" : "de", stream->index);
3966 ds->selected = selected;
3967 update_stream_selection_state(in, ds);
3968 in->tracks_switched = true;
3969 if (ds->selected) {
3970 if (in->back_demuxing)
3971 ds->back_seek_pos = ref_pts;
3972 if (!in->after_seek)
3973 initiate_refresh_seek(in, ds, ref_pts);
3974 }
3975 if (in->threading) {
3976 pthread_cond_signal(&in->wakeup);
3977 } else {
3978 execute_trackswitch(in);
3979 }
3980 }
3981 pthread_mutex_unlock(&in->lock);
3982 }
3983
3984 // Execute a refresh seek on the given stream.
3985 // ref_pts has the same meaning as with demuxer_select_track()
demuxer_refresh_track(struct demuxer * demuxer,struct sh_stream * stream,double ref_pts)3986 void demuxer_refresh_track(struct demuxer *demuxer, struct sh_stream *stream,
3987 double ref_pts)
3988 {
3989 struct demux_internal *in = demuxer->in;
3990 struct demux_stream *ds = stream->ds;
3991 pthread_mutex_lock(&in->lock);
3992 ref_pts = MP_ADD_PTS(ref_pts, -in->ts_offset);
3993 if (ds->selected) {
3994 MP_VERBOSE(in, "refresh track %d\n", stream->index);
3995 update_stream_selection_state(in, ds);
3996 if (in->back_demuxing)
3997 ds->back_seek_pos = ref_pts;
3998 if (!in->after_seek)
3999 initiate_refresh_seek(in, ds, ref_pts);
4000 }
4001 pthread_mutex_unlock(&in->lock);
4002 }
4003
4004 // This is for demuxer implementations only. demuxer_select_track() sets the
4005 // logical state, while this function returns the actual state (in case the
4006 // demuxer attempts to cache even unselected packets for track switching - this
4007 // will potentially be done in the future).
demux_stream_is_selected(struct sh_stream * stream)4008 bool demux_stream_is_selected(struct sh_stream *stream)
4009 {
4010 if (!stream)
4011 return false;
4012 bool r = false;
4013 pthread_mutex_lock(&stream->ds->in->lock);
4014 r = stream->ds->selected;
4015 pthread_mutex_unlock(&stream->ds->in->lock);
4016 return r;
4017 }
4018
demux_set_stream_wakeup_cb(struct sh_stream * sh,void (* cb)(void * ctx),void * ctx)4019 void demux_set_stream_wakeup_cb(struct sh_stream *sh,
4020 void (*cb)(void *ctx), void *ctx)
4021 {
4022 pthread_mutex_lock(&sh->ds->in->lock);
4023 sh->ds->wakeup_cb = cb;
4024 sh->ds->wakeup_cb_ctx = ctx;
4025 sh->ds->need_wakeup = true;
4026 pthread_mutex_unlock(&sh->ds->in->lock);
4027 }
4028
demuxer_add_attachment(demuxer_t * demuxer,char * name,char * type,void * data,size_t data_size)4029 int demuxer_add_attachment(demuxer_t *demuxer, char *name, char *type,
4030 void *data, size_t data_size)
4031 {
4032 if (!(demuxer->num_attachments % 32))
4033 demuxer->attachments = talloc_realloc(demuxer, demuxer->attachments,
4034 struct demux_attachment,
4035 demuxer->num_attachments + 32);
4036
4037 struct demux_attachment *att = &demuxer->attachments[demuxer->num_attachments];
4038 att->name = talloc_strdup(demuxer->attachments, name);
4039 att->type = talloc_strdup(demuxer->attachments, type);
4040 att->data = talloc_memdup(demuxer->attachments, data, data_size);
4041 att->data_size = data_size;
4042
4043 return demuxer->num_attachments++;
4044 }
4045
chapter_compare(const void * p1,const void * p2)4046 static int chapter_compare(const void *p1, const void *p2)
4047 {
4048 struct demux_chapter *c1 = (void *)p1;
4049 struct demux_chapter *c2 = (void *)p2;
4050
4051 if (c1->pts > c2->pts)
4052 return 1;
4053 else if (c1->pts < c2->pts)
4054 return -1;
4055 return c1->original_index > c2->original_index ? 1 :-1; // never equal
4056 }
4057
demuxer_sort_chapters(demuxer_t * demuxer)4058 static void demuxer_sort_chapters(demuxer_t *demuxer)
4059 {
4060 if (demuxer->num_chapters) {
4061 qsort(demuxer->chapters, demuxer->num_chapters,
4062 sizeof(struct demux_chapter), chapter_compare);
4063 }
4064 }
4065
demuxer_add_chapter(demuxer_t * demuxer,char * name,double pts,uint64_t demuxer_id)4066 int demuxer_add_chapter(demuxer_t *demuxer, char *name,
4067 double pts, uint64_t demuxer_id)
4068 {
4069 struct demux_chapter new = {
4070 .original_index = demuxer->num_chapters,
4071 .pts = pts,
4072 .metadata = talloc_zero(demuxer, struct mp_tags),
4073 .demuxer_id = demuxer_id,
4074 };
4075 mp_tags_set_str(new.metadata, "TITLE", name);
4076 MP_TARRAY_APPEND(demuxer, demuxer->chapters, demuxer->num_chapters, new);
4077 return demuxer->num_chapters - 1;
4078 }
4079
4080 // Disallow reading any packets and make readers think there is no new data
4081 // yet, until a seek is issued.
demux_block_reading(struct demuxer * demuxer,bool block)4082 void demux_block_reading(struct demuxer *demuxer, bool block)
4083 {
4084 struct demux_internal *in = demuxer->in;
4085 assert(demuxer == in->d_user);
4086
4087 pthread_mutex_lock(&in->lock);
4088 in->blocked = block;
4089 for (int n = 0; n < in->num_streams; n++) {
4090 in->streams[n]->ds->need_wakeup = true;
4091 wakeup_ds(in->streams[n]->ds);
4092 }
4093 pthread_cond_signal(&in->wakeup);
4094 pthread_mutex_unlock(&in->lock);
4095 }
4096
update_bytes_read(struct demux_internal * in)4097 static void update_bytes_read(struct demux_internal *in)
4098 {
4099 struct demuxer *demuxer = in->d_thread;
4100
4101 int64_t new = in->slave_unbuffered_read_bytes;
4102 in->slave_unbuffered_read_bytes = 0;
4103
4104 int64_t new_seeks = 0;
4105
4106 struct stream *stream = demuxer->stream;
4107 if (stream) {
4108 new += stream->total_unbuffered_read_bytes;
4109 stream->total_unbuffered_read_bytes = 0;
4110 new_seeks += stream->total_stream_seeks;
4111 stream->total_stream_seeks = 0;
4112 }
4113
4114 in->cache_unbuffered_read_bytes += new;
4115 in->hack_unbuffered_read_bytes += new;
4116 in->byte_level_seeks += new_seeks;
4117 }
4118
4119 // must be called locked, temporarily unlocks
update_cache(struct demux_internal * in)4120 static void update_cache(struct demux_internal *in)
4121 {
4122 struct demuxer *demuxer = in->d_thread;
4123 struct stream *stream = demuxer->stream;
4124
4125 int64_t now = mp_time_us();
4126 int64_t diff = now - in->last_speed_query;
4127 bool do_update = diff >= MP_SECOND_US;
4128
4129 // Don't lock while querying the stream.
4130 pthread_mutex_unlock(&in->lock);
4131
4132 int64_t stream_size = -1;
4133 struct mp_tags *stream_metadata = NULL;
4134 if (stream) {
4135 if (do_update)
4136 stream_size = stream_get_size(stream);
4137 stream_control(stream, STREAM_CTRL_GET_METADATA, &stream_metadata);
4138 }
4139
4140 pthread_mutex_lock(&in->lock);
4141
4142 update_bytes_read(in);
4143
4144 if (do_update)
4145 in->stream_size = stream_size;
4146 if (stream_metadata) {
4147 add_timed_metadata(in, stream_metadata, NULL, MP_NOPTS_VALUE);
4148 talloc_free(stream_metadata);
4149 }
4150
4151 in->next_cache_update = INT64_MAX;
4152
4153 if (do_update) {
4154 uint64_t bytes = in->cache_unbuffered_read_bytes;
4155 in->cache_unbuffered_read_bytes = 0;
4156 in->last_speed_query = now;
4157 double speed = bytes / (diff / (double)MP_SECOND_US);
4158 in->bytes_per_second = 0.5 * in->speed_query_prev_sample +
4159 0.5 * speed;
4160 in->speed_query_prev_sample = speed;
4161 }
4162 // The idea is to update as long as there is "activity".
4163 if (in->bytes_per_second)
4164 in->next_cache_update = now + MP_SECOND_US + 1;
4165 }
4166
dumper_close(struct demux_internal * in)4167 static void dumper_close(struct demux_internal *in)
4168 {
4169 if (in->dumper)
4170 mp_recorder_destroy(in->dumper);
4171 in->dumper = NULL;
4172 if (in->dumper_status == CONTROL_TRUE)
4173 in->dumper_status = CONTROL_FALSE; // make abort equal to success
4174 }
4175
range_time_compare(const void * p1,const void * p2)4176 static int range_time_compare(const void *p1, const void *p2)
4177 {
4178 struct demux_cached_range *r1 = *((struct demux_cached_range **)p1);
4179 struct demux_cached_range *r2 = *((struct demux_cached_range **)p2);
4180
4181 if (r1->seek_start == r2->seek_start)
4182 return 0;
4183 return r1->seek_start < r2->seek_start ? -1 : 1;
4184 }
4185
dump_cache(struct demux_internal * in,double start,double end)4186 static void dump_cache(struct demux_internal *in, double start, double end)
4187 {
4188 in->dumper_status = in->dumper ? CONTROL_TRUE : CONTROL_ERROR;
4189 if (!in->dumper)
4190 return;
4191
4192 // (only in pathological cases there might be more ranges than allowed)
4193 struct demux_cached_range *ranges[MAX_SEEK_RANGES];
4194 int num_ranges = 0;
4195 for (int n = 0; n < MPMIN(MP_ARRAY_SIZE(ranges), in->num_ranges); n++)
4196 ranges[num_ranges++] = in->ranges[n];
4197 qsort(ranges, num_ranges, sizeof(ranges[0]), range_time_compare);
4198
4199 for (int n = 0; n < num_ranges; n++) {
4200 struct demux_cached_range *r = ranges[n];
4201 if (r->seek_start == MP_NOPTS_VALUE)
4202 continue;
4203 if (r->seek_end <= start)
4204 continue;
4205 if (end != MP_NOPTS_VALUE && r->seek_start >= end)
4206 continue;
4207
4208 mp_recorder_mark_discontinuity(in->dumper);
4209
4210 double pts = start;
4211 int flags = 0;
4212 adjust_cache_seek_target(in, r, &pts, &flags);
4213
4214 for (int i = 0; i < r->num_streams; i++) {
4215 struct demux_queue *q = r->streams[i];
4216 struct demux_stream *ds = q->ds;
4217
4218 ds->dump_pos = find_seek_target(q, pts, flags);
4219 }
4220
4221 // We need to reinterleave the separate streams somehow, which makes
4222 // everything more complex.
4223 while (1) {
4224 struct demux_packet *next = NULL;
4225 double next_dts = MP_NOPTS_VALUE;
4226
4227 for (int i = 0; i < r->num_streams; i++) {
4228 struct demux_stream *ds = r->streams[i]->ds;
4229 struct demux_packet *dp = ds->dump_pos;
4230
4231 if (!dp)
4232 continue;
4233 assert(dp->stream == ds->index);
4234
4235 double pdts = MP_PTS_OR_DEF(dp->dts, dp->pts);
4236
4237 // Check for stream EOF. Note that we don't try to EOF
4238 // streams at the same point (e.g. video can take longer
4239 // to finish than audio, so the output file will have no
4240 // audio for the last part of the video). Too much effort.
4241 if (pdts != MP_NOPTS_VALUE && end != MP_NOPTS_VALUE &&
4242 pdts >= end && dp->keyframe)
4243 {
4244 ds->dump_pos = NULL;
4245 continue;
4246 }
4247
4248 if (pdts == MP_NOPTS_VALUE || next_dts == MP_NOPTS_VALUE ||
4249 pdts < next_dts)
4250 {
4251 next_dts = pdts;
4252 next = dp;
4253 }
4254 }
4255
4256 if (!next)
4257 break;
4258
4259 struct demux_stream *ds = in->streams[next->stream]->ds;
4260 ds->dump_pos = next->next;
4261
4262 struct demux_packet *dp = read_packet_from_cache(in, next);
4263 if (!dp) {
4264 in->dumper_status = CONTROL_ERROR;
4265 break;
4266 }
4267
4268 write_dump_packet(in, dp);
4269
4270 talloc_free(dp);
4271 }
4272
4273 if (in->dumper_status != CONTROL_OK)
4274 break;
4275 }
4276
4277 // (strictly speaking unnecessary; for clarity)
4278 for (int n = 0; n < in->num_streams; n++)
4279 in->streams[n]->ds->dump_pos = NULL;
4280
4281 // If dumping (in end==NOPTS mode) doesn't continue at the range that
4282 // was written last, we have a discontinuity.
4283 if (num_ranges && ranges[num_ranges - 1] != in->current_range)
4284 mp_recorder_mark_discontinuity(in->dumper);
4285
4286 // end=NOPTS means the demuxer output continues to be written to the
4287 // dump file.
4288 if (end != MP_NOPTS_VALUE || in->dumper_status != CONTROL_OK)
4289 dumper_close(in);
4290 }
4291
4292 // Set the current cache dumping mode. There is only at most 1 dump process
4293 // active, so calling this aborts the previous dumping. Passing file==NULL
4294 // stops dumping.
4295 // This is synchronous with demux_cache_dump_get_status() (i.e. starting or
4296 // aborting is not asynchronous). On status change, the demuxer wakeup callback
4297 // is invoked (except for this call).
4298 // Returns whether dumping was logically started.
demux_cache_dump_set(struct demuxer * demuxer,double start,double end,char * file)4299 bool demux_cache_dump_set(struct demuxer *demuxer, double start, double end,
4300 char *file)
4301 {
4302 struct demux_internal *in = demuxer->in;
4303 assert(demuxer == in->d_user);
4304
4305 bool res = false;
4306
4307 pthread_mutex_lock(&in->lock);
4308
4309 start = MP_ADD_PTS(start, -in->ts_offset);
4310 end = MP_ADD_PTS(end, -in->ts_offset);
4311
4312 dumper_close(in);
4313
4314 if (file && file[0] && start != MP_NOPTS_VALUE) {
4315 res = true;
4316
4317 in->dumper = recorder_create(in, file);
4318
4319 // This is not asynchronous and will freeze the shit for a while if the
4320 // user is unlucky. It could be moved to a thread with some effort.
4321 // General idea: iterate over all cache ranges, dump what intersects.
4322 // After that, and if the user requested it, make it dump all newly
4323 // received packets, even if it's awkward (consider the case if the
4324 // current range is not the last range).
4325 dump_cache(in, start, end);
4326 }
4327
4328 pthread_mutex_unlock(&in->lock);
4329
4330 return res;
4331 }
4332
4333 // Returns one of CONTROL_*. CONTROL_TRUE means dumping is in progress.
demux_cache_dump_get_status(struct demuxer * demuxer)4334 int demux_cache_dump_get_status(struct demuxer *demuxer)
4335 {
4336 struct demux_internal *in = demuxer->in;
4337 pthread_mutex_lock(&in->lock);
4338 int status = in->dumper_status;
4339 pthread_mutex_unlock(&in->lock);
4340 return status;
4341 }
4342
4343 // Return what range demux_cache_dump_set() would (probably) yield. This is a
4344 // conservative amount (in addition to internal consistency of this code, it
4345 // depends on what a player will do with the resulting file).
4346 // Use for_end==true to get the end of dumping, other the start.
4347 // Returns NOPTS if nothing was found.
demux_probe_cache_dump_target(struct demuxer * demuxer,double pts,bool for_end)4348 double demux_probe_cache_dump_target(struct demuxer *demuxer, double pts,
4349 bool for_end)
4350 {
4351 struct demux_internal *in = demuxer->in;
4352 assert(demuxer == in->d_user);
4353
4354 double res = MP_NOPTS_VALUE;
4355 if (pts == MP_NOPTS_VALUE)
4356 return pts;
4357
4358 pthread_mutex_lock(&in->lock);
4359
4360 pts = MP_ADD_PTS(pts, -in->ts_offset);
4361
4362 // (When determining the end, look before the keyframe at pts, so subtract
4363 // an arbitrary amount to round down.)
4364 double seek_pts = for_end ? pts - 0.001 : pts;
4365 int flags = 0;
4366 struct demux_cached_range *r = find_cache_seek_range(in, seek_pts, flags);
4367 if (r) {
4368 if (!for_end)
4369 adjust_cache_seek_target(in, r, &pts, &flags);
4370
4371 double t[STREAM_TYPE_COUNT];
4372 for (int n = 0; n < STREAM_TYPE_COUNT; n++)
4373 t[n] = MP_NOPTS_VALUE;
4374
4375 for (int n = 0; n < in->num_streams; n++) {
4376 struct demux_stream *ds = in->streams[n]->ds;
4377 struct demux_queue *q = r->streams[n];
4378
4379 struct demux_packet *dp = find_seek_target(q, pts, flags);
4380 if (dp) {
4381 if (for_end) {
4382 while (dp) {
4383 double pdts = MP_PTS_OR_DEF(dp->dts, dp->pts);
4384
4385 if (pdts != MP_NOPTS_VALUE && pdts >= pts && dp->keyframe)
4386 break;
4387
4388 t[ds->type] = MP_PTS_MAX(t[ds->type], pdts);
4389
4390 dp = dp->next;
4391 }
4392 } else {
4393 double start;
4394 compute_keyframe_times(dp, &start, NULL);
4395 start = MP_PTS_MAX(start, r->seek_start);
4396 t[ds->type] = MP_PTS_MAX(t[ds->type], start);
4397 }
4398 }
4399 }
4400
4401 res = t[STREAM_VIDEO];
4402 if (res == MP_NOPTS_VALUE)
4403 res = t[STREAM_AUDIO];
4404 if (res == MP_NOPTS_VALUE) {
4405 for (int n = 0; n < STREAM_TYPE_COUNT; n++) {
4406 res = t[n];
4407 if (res != MP_NOPTS_VALUE)
4408 break;
4409 }
4410 }
4411 }
4412
4413 res = MP_ADD_PTS(res, in->ts_offset);
4414
4415 pthread_mutex_unlock(&in->lock);
4416
4417 return res;
4418 }
4419
4420 // Used by demuxers to report the amount of transferred bytes. This is for
4421 // streams which circumvent demuxer->stream (stream statistics are handled by
4422 // demux.c itself).
demux_report_unbuffered_read_bytes(struct demuxer * demuxer,int64_t new)4423 void demux_report_unbuffered_read_bytes(struct demuxer *demuxer, int64_t new)
4424 {
4425 struct demux_internal *in = demuxer->in;
4426 assert(demuxer == in->d_thread);
4427
4428 in->slave_unbuffered_read_bytes += new;
4429 }
4430
4431 // Return bytes read since last query. It's a hack because it works only if
4432 // the demuxer thread is disabled.
demux_get_bytes_read_hack(struct demuxer * demuxer)4433 int64_t demux_get_bytes_read_hack(struct demuxer *demuxer)
4434 {
4435 struct demux_internal *in = demuxer->in;
4436
4437 // Required because demuxer==in->d_user, and we access in->d_thread.
4438 // Locking won't solve this, because we also need to access struct stream.
4439 assert(!in->threading);
4440
4441 update_bytes_read(in);
4442
4443 int64_t res = in->hack_unbuffered_read_bytes;
4444 in->hack_unbuffered_read_bytes = 0;
4445 return res;
4446 }
4447
demux_get_bitrate_stats(struct demuxer * demuxer,double * rates)4448 void demux_get_bitrate_stats(struct demuxer *demuxer, double *rates)
4449 {
4450 struct demux_internal *in = demuxer->in;
4451 assert(demuxer == in->d_user);
4452
4453 pthread_mutex_lock(&in->lock);
4454
4455 for (int n = 0; n < STREAM_TYPE_COUNT; n++)
4456 rates[n] = -1;
4457 for (int n = 0; n < in->num_streams; n++) {
4458 struct demux_stream *ds = in->streams[n]->ds;
4459 if (ds->selected && ds->bitrate >= 0)
4460 rates[ds->type] = MPMAX(0, rates[ds->type]) + ds->bitrate;
4461 }
4462
4463 pthread_mutex_unlock(&in->lock);
4464 }
4465
demux_get_reader_state(struct demuxer * demuxer,struct demux_reader_state * r)4466 void demux_get_reader_state(struct demuxer *demuxer, struct demux_reader_state *r)
4467 {
4468 struct demux_internal *in = demuxer->in;
4469 assert(demuxer == in->d_user);
4470
4471 pthread_mutex_lock(&in->lock);
4472
4473 *r = (struct demux_reader_state){
4474 .eof = in->eof,
4475 .ts_reader = MP_NOPTS_VALUE,
4476 .ts_end = MP_NOPTS_VALUE,
4477 .ts_duration = -1,
4478 .total_bytes = in->total_bytes,
4479 .seeking = in->seeking_in_progress,
4480 .low_level_seeks = in->low_level_seeks,
4481 .ts_last = in->demux_ts,
4482 .bytes_per_second = in->bytes_per_second,
4483 .byte_level_seeks = in->byte_level_seeks,
4484 .file_cache_bytes = in->cache ? demux_cache_get_size(in->cache) : -1,
4485 };
4486 bool any_packets = false;
4487 for (int n = 0; n < in->num_streams; n++) {
4488 struct demux_stream *ds = in->streams[n]->ds;
4489 if (ds->eager && !(!ds->queue->head && ds->eof) && !ds->ignore_eof) {
4490 r->underrun |= !ds->reader_head && !ds->eof && !ds->still_image;
4491 r->ts_reader = MP_PTS_MAX(r->ts_reader, ds->base_ts);
4492 r->ts_end = MP_PTS_MAX(r->ts_end, ds->queue->last_ts);
4493 any_packets |= !!ds->reader_head;
4494 }
4495 r->fw_bytes += get_foward_buffered_bytes(ds);
4496 }
4497 r->idle = (!in->reading && !r->underrun) || r->eof;
4498 r->underrun &= !r->idle && in->threading;
4499 r->ts_reader = MP_ADD_PTS(r->ts_reader, in->ts_offset);
4500 r->ts_end = MP_ADD_PTS(r->ts_end, in->ts_offset);
4501 if (r->ts_reader != MP_NOPTS_VALUE && r->ts_reader <= r->ts_end)
4502 r->ts_duration = r->ts_end - r->ts_reader;
4503 if (in->seeking || !any_packets)
4504 r->ts_duration = 0;
4505 for (int n = 0; n < MPMIN(in->num_ranges, MAX_SEEK_RANGES); n++) {
4506 struct demux_cached_range *range = in->ranges[n];
4507 if (range->seek_start != MP_NOPTS_VALUE) {
4508 r->seek_ranges[r->num_seek_ranges++] =
4509 (struct demux_seek_range){
4510 .start = MP_ADD_PTS(range->seek_start, in->ts_offset),
4511 .end = MP_ADD_PTS(range->seek_end, in->ts_offset),
4512 };
4513 r->bof_cached |= range->is_bof;
4514 r->eof_cached |= range->is_eof;
4515 }
4516 }
4517
4518 pthread_mutex_unlock(&in->lock);
4519 }
4520
demux_cancel_test(struct demuxer * demuxer)4521 bool demux_cancel_test(struct demuxer *demuxer)
4522 {
4523 return mp_cancel_test(demuxer->cancel);
4524 }
4525
demux_copy_chapter_data(struct demux_chapter * c,int num)4526 struct demux_chapter *demux_copy_chapter_data(struct demux_chapter *c, int num)
4527 {
4528 struct demux_chapter *new = talloc_array(NULL, struct demux_chapter, num);
4529 for (int n = 0; n < num; n++) {
4530 new[n] = c[n];
4531 new[n].metadata = mp_tags_dup(new, new[n].metadata);
4532 }
4533 return new;
4534 }
4535
visit_tags(void * ctx,void (* visit)(void * ctx,void * ta,char ** s),struct mp_tags * tags)4536 static void visit_tags(void *ctx, void (*visit)(void *ctx, void *ta, char **s),
4537 struct mp_tags *tags)
4538 {
4539 for (int n = 0; n < (tags ? tags->num_keys : 0); n++)
4540 visit(ctx, tags, &tags->values[n]);
4541 }
4542
visit_meta(struct demuxer * demuxer,void * ctx,void (* visit)(void * ctx,void * ta,char ** s))4543 static void visit_meta(struct demuxer *demuxer, void *ctx,
4544 void (*visit)(void *ctx, void *ta, char **s))
4545 {
4546 struct demux_internal *in = demuxer->in;
4547
4548 for (int n = 0; n < in->num_streams; n++) {
4549 struct sh_stream *sh = in->streams[n];
4550
4551 visit(ctx, sh, &sh->title);
4552 visit_tags(ctx, visit, sh->tags);
4553 }
4554
4555 for (int n = 0; n < demuxer->num_chapters; n++)
4556 visit_tags(ctx, visit, demuxer->chapters[n].metadata);
4557
4558 visit_tags(ctx, visit, demuxer->metadata);
4559 }
4560
4561
visit_detect(void * ctx,void * ta,char ** s)4562 static void visit_detect(void *ctx, void *ta, char **s)
4563 {
4564 char **all = ctx;
4565
4566 if (*s)
4567 *all = talloc_asprintf_append_buffer(*all, "%s\n", *s);
4568 }
4569
visit_convert(void * ctx,void * ta,char ** s)4570 static void visit_convert(void *ctx, void *ta, char **s)
4571 {
4572 struct demuxer *demuxer = ctx;
4573 struct demux_internal *in = demuxer->in;
4574
4575 if (!*s)
4576 return;
4577
4578 bstr data = bstr0(*s);
4579 bstr conv = mp_iconv_to_utf8(in->log, data, in->meta_charset,
4580 MP_ICONV_VERBOSE);
4581 if (conv.start && conv.start != data.start) {
4582 char *ns = conv.start; // 0-termination is guaranteed
4583 // (The old string might not be an alloc, but if it is, it's a talloc
4584 // child, and will not leak, even if it stays allocated uselessly.)
4585 *s = ns;
4586 talloc_steal(ta, *s);
4587 }
4588 }
4589
demux_convert_tags_charset(struct demuxer * demuxer)4590 static void demux_convert_tags_charset(struct demuxer *demuxer)
4591 {
4592 struct demux_internal *in = demuxer->in;
4593
4594 char *cp = in->opts->meta_cp;
4595 if (!cp || mp_charset_is_utf8(cp))
4596 return;
4597
4598 char *data = talloc_strdup(NULL, "");
4599 visit_meta(demuxer, &data, visit_detect);
4600
4601 in->meta_charset = (char *)mp_charset_guess(in, in->log, bstr0(data), cp, 0);
4602 if (in->meta_charset && !mp_charset_is_utf8(in->meta_charset)) {
4603 MP_INFO(demuxer, "Using tag charset: %s\n", in->meta_charset);
4604 visit_meta(demuxer, demuxer, visit_convert);
4605 }
4606
4607 talloc_free(data);
4608 }
4609
get_demux_sub_opts(int index,const struct m_sub_options ** sub)4610 static bool get_demux_sub_opts(int index, const struct m_sub_options **sub)
4611 {
4612 if (!demuxer_list[index])
4613 return false;
4614 *sub = demuxer_list[index]->options;
4615 return true;
4616 }
4617