1 /*
2  * This file is part of mpv.
3  *
4  * mpv is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or (at your option) any later version.
8  *
9  * mpv is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with mpv.  If not, see <http://www.gnu.org/licenses/>.
16  */
17 
18 #include <float.h>
19 #include <stdio.h>
20 #include <stdlib.h>
21 #include <string.h>
22 #include <assert.h>
23 #include <unistd.h>
24 #include <limits.h>
25 #include <pthread.h>
26 #include <stdint.h>
27 
28 #include <math.h>
29 
30 #include <sys/types.h>
31 #include <sys/stat.h>
32 
33 #include "cache.h"
34 #include "config.h"
35 #include "options/m_config.h"
36 #include "options/m_option.h"
37 #include "mpv_talloc.h"
38 #include "common/av_common.h"
39 #include "common/msg.h"
40 #include "common/global.h"
41 #include "common/recorder.h"
42 #include "common/stats.h"
43 #include "misc/charset_conv.h"
44 #include "misc/thread_tools.h"
45 #include "osdep/atomic.h"
46 #include "osdep/timer.h"
47 #include "osdep/threads.h"
48 
49 #include "stream/stream.h"
50 #include "demux.h"
51 #include "timeline.h"
52 #include "stheader.h"
53 #include "cue.h"
54 
55 // Demuxer list
56 extern const struct demuxer_desc demuxer_desc_edl;
57 extern const struct demuxer_desc demuxer_desc_cue;
58 extern const demuxer_desc_t demuxer_desc_rawaudio;
59 extern const demuxer_desc_t demuxer_desc_rawvideo;
60 extern const demuxer_desc_t demuxer_desc_mf;
61 extern const demuxer_desc_t demuxer_desc_matroska;
62 extern const demuxer_desc_t demuxer_desc_lavf;
63 extern const demuxer_desc_t demuxer_desc_playlist;
64 extern const demuxer_desc_t demuxer_desc_disc;
65 extern const demuxer_desc_t demuxer_desc_rar;
66 extern const demuxer_desc_t demuxer_desc_libarchive;
67 extern const demuxer_desc_t demuxer_desc_null;
68 extern const demuxer_desc_t demuxer_desc_timeline;
69 
70 static const demuxer_desc_t *const demuxer_list[] = {
71     &demuxer_desc_disc,
72     &demuxer_desc_edl,
73     &demuxer_desc_cue,
74     &demuxer_desc_rawaudio,
75     &demuxer_desc_rawvideo,
76     &demuxer_desc_matroska,
77 #if HAVE_LIBARCHIVE
78     &demuxer_desc_libarchive,
79 #endif
80     &demuxer_desc_lavf,
81     &demuxer_desc_mf,
82     &demuxer_desc_playlist,
83     &demuxer_desc_null,
84     NULL
85 };
86 
87 struct demux_opts {
88     int enable_cache;
89     int disk_cache;
90     int64_t max_bytes;
91     int64_t max_bytes_bw;
92     int donate_fw;
93     double min_secs;
94     int force_seekable;
95     double min_secs_cache;
96     int access_references;
97     int seekable_cache;
98     int create_ccs;
99     char *record_file;
100     int video_back_preroll;
101     int audio_back_preroll;
102     int back_batch[STREAM_TYPE_COUNT];
103     double back_seek_size;
104     char *meta_cp;
105     int force_retry_eof;
106 };
107 
108 #define OPT_BASE_STRUCT struct demux_opts
109 
110 static bool get_demux_sub_opts(int index, const struct m_sub_options **sub);
111 
112 const struct m_sub_options demux_conf = {
113     .opts = (const struct m_option[]){
114         {"cache", OPT_CHOICE(enable_cache,
115             {"no", 0}, {"auto", -1}, {"yes", 1})},
116         {"cache-on-disk", OPT_FLAG(disk_cache)},
117         {"demuxer-readahead-secs", OPT_DOUBLE(min_secs), M_RANGE(0, DBL_MAX)},
118         {"demuxer-max-bytes", OPT_BYTE_SIZE(max_bytes),
119             M_RANGE(0, M_MAX_MEM_BYTES)},
120         {"demuxer-max-back-bytes", OPT_BYTE_SIZE(max_bytes_bw),
121             M_RANGE(0, M_MAX_MEM_BYTES)},
122         {"demuxer-donate-buffer", OPT_FLAG(donate_fw)},
123         {"force-seekable", OPT_FLAG(force_seekable)},
124         {"cache-secs", OPT_DOUBLE(min_secs_cache), M_RANGE(0, DBL_MAX)},
125         {"access-references", OPT_FLAG(access_references)},
126         {"demuxer-seekable-cache", OPT_CHOICE(seekable_cache,
127             {"auto", -1}, {"no", 0}, {"yes", 1})},
128         {"sub-create-cc-track", OPT_FLAG(create_ccs)},
129         {"stream-record", OPT_STRING(record_file)},
130         {"video-backward-overlap", OPT_CHOICE(video_back_preroll, {"auto", -1}),
131             M_RANGE(0, 1024)},
132         {"audio-backward-overlap", OPT_CHOICE(audio_back_preroll, {"auto", -1}),
133             M_RANGE(0, 1024)},
134         {"video-backward-batch", OPT_INT(back_batch[STREAM_VIDEO]),
135             M_RANGE(0, 1024)},
136         {"audio-backward-batch", OPT_INT(back_batch[STREAM_AUDIO]),
137             M_RANGE(0, 1024)},
138         {"demuxer-backward-playback-step", OPT_DOUBLE(back_seek_size),
139             M_RANGE(0, DBL_MAX)},
140         {"metadata-codepage", OPT_STRING(meta_cp)},
141         {"demuxer-force-retry-on-eof", OPT_FLAG(force_retry_eof),
142          .deprecation_message = "temporary debug option, no replacement"},
143         {0}
144     },
145     .size = sizeof(struct demux_opts),
146     .defaults = &(const struct demux_opts){
147         .enable_cache = -1, // auto
148         .max_bytes = 150 * 1024 * 1024,
149         .max_bytes_bw = 50 * 1024 * 1024,
150         .donate_fw = 1,
151         .min_secs = 1.0,
152         .min_secs_cache = 1000.0 * 60 * 60,
153         .seekable_cache = -1,
154         .access_references = 1,
155         .video_back_preroll = -1,
156         .audio_back_preroll = -1,
157         .back_seek_size = 60,
158         .back_batch = {
159             [STREAM_VIDEO] = 1,
160             [STREAM_AUDIO] = 10,
161         },
162         .meta_cp = "utf-8",
163     },
164     .get_sub_options = get_demux_sub_opts,
165 };
166 
167 struct demux_internal {
168     struct mp_log *log;
169     struct mpv_global *global;
170     struct stats_ctx *stats;
171 
172     bool can_cache;             // not a slave demuxer; caching makes sense
173     bool can_record;            // stream recording is allowed
174 
175     // The demuxer runs potentially in another thread, so we keep two demuxer
176     // structs; the real demuxer can access the shadow struct only.
177     struct demuxer *d_thread;   // accessed by demuxer impl. (producer)
178     struct demuxer *d_user;     // accessed by player (consumer)
179 
180     // The lock protects the packet queues (struct demux_stream),
181     // and the fields below.
182     pthread_mutex_t lock;
183     pthread_cond_t wakeup;
184     pthread_t thread;
185 
186     // -- All the following fields are protected by lock.
187 
188     struct demux_opts *opts;
189     struct m_config_cache *opts_cache;
190 
191     bool thread_terminate;
192     bool threading;
193     bool shutdown_async;
194     void (*wakeup_cb)(void *ctx);
195     void *wakeup_cb_ctx;
196 
197     struct sh_stream **streams;
198     int num_streams;
199 
200     char *meta_charset;
201 
202     // If non-NULL, a stream which is used for global (timed) metadata. It will
203     // be an arbitrary stream, which hopefully will happen to work.
204     struct sh_stream *metadata_stream;
205 
206     int events;
207 
208     struct demux_cache *cache;
209 
210     bool warned_queue_overflow;
211     bool eof;                   // whether we're in EOF state
212     double min_secs;
213     size_t max_bytes;
214     size_t max_bytes_bw;
215     bool seekable_cache;
216     bool using_network_cache_opts;
217     char *record_filename;
218 
219     // Whether the demuxer thread should prefetch packets. This is set to false
220     // if EOF was reached or the demuxer cache is full. This is also important
221     // in the initial state: the decoder thread needs to select streams before
222     // the first packet is read, so this is set to true by packet reading only.
223     // Reset to false again on EOF or if prefetching is done.
224     bool reading;
225 
226     // Set if we just performed a seek, without reading packets yet. Used to
227     // avoid a redundant initial seek after enabling streams. We could just
228     // allow it, but to avoid buggy seeking affecting normal playback, we don't.
229     bool after_seek;
230     // Set in addition to after_seek if we think we seeked to the start of the
231     // file (or if the demuxer was just opened).
232     bool after_seek_to_start;
233 
234     // Demuxing backwards. Since demuxer implementations don't support this
235     // directly, it is emulated by seeking backwards for every packet run. Also,
236     // packets between keyframes are demuxed forwards (you can't decode that
237     // stuff otherwise), which adds complexity on top of it.
238     bool back_demuxing;
239 
240     // For backward demuxing:
241     bool need_back_seek;        // back-step seek needs to be triggered
242     bool back_any_need_recheck; // at least 1 ds->back_need_recheck set
243 
244     bool tracks_switched;       // thread needs to inform demuxer of this
245 
246     bool seeking;               // there's a seek queued
247     int seek_flags;             // flags for next seek (if seeking==true)
248     double seek_pts;
249 
250     // (fields for debugging)
251     double seeking_in_progress; // low level seek state
252     int low_level_seeks;        // number of started low level seeks
253     double demux_ts;            // last demuxed DTS or PTS
254 
255     double ts_offset;           // timestamp offset to apply to everything
256 
257     // (sorted by least recent use: index 0 is least recently used)
258     struct demux_cached_range **ranges;
259     int num_ranges;
260 
261     size_t total_bytes;         // total sum of packet data buffered
262     // Range from which decoder is reading, and to which demuxer is appending.
263     // This is normally never NULL. This is always ranges[num_ranges - 1].
264     // This is can be NULL during initialization or deinitialization.
265     struct demux_cached_range *current_range;
266 
267     double highest_av_pts;      // highest non-subtitle PTS seen - for duration
268 
269     bool blocked;
270 
271     // Transient state.
272     double duration;
273     // Cached state.
274     int64_t stream_size;
275     int64_t last_speed_query;
276     double speed_query_prev_sample;
277     uint64_t bytes_per_second;
278     int64_t next_cache_update;
279 
280     // demux user state (user thread, somewhat similar to reader/decoder state)
281     double last_playback_pts;   // last playback_pts from demux_update()
282     bool force_metadata_update;
283     int cached_metadata_index;  // speed up repeated lookups
284 
285     struct mp_recorder *dumper;
286     int dumper_status;
287 
288     bool owns_stream;
289 
290     // -- Access from demuxer thread only
291     bool enable_recording;
292     struct mp_recorder *recorder;
293     int64_t slave_unbuffered_read_bytes; // value repoted from demuxer impl.
294     int64_t hack_unbuffered_read_bytes;  // for demux_get_bytes_read_hack()
295     int64_t cache_unbuffered_read_bytes; // for demux_reader_state.bytes_per_second
296     int64_t byte_level_seeks;            // for demux_reader_state.byte_level_seeks
297 };
298 
299 struct timed_metadata {
300     double pts;
301     struct mp_tags *tags;
302     bool from_stream;
303 };
304 
305 // A continuous range of cached packets for all enabled streams.
306 // (One demux_queue for each known stream.)
307 struct demux_cached_range {
308     // streams[] is indexed by demux_stream->index
309     struct demux_queue **streams;
310     int num_streams;
311 
312     // Computed from the stream queue's values. These fields (unlike as with
313     // demux_queue) are always either NOPTS, or fully valid.
314     double seek_start, seek_end;
315 
316     bool is_bof;            // set if the file begins with this range
317     bool is_eof;            // set if the file ends with this range
318 
319     struct timed_metadata **metadata;
320     int num_metadata;
321 };
322 
323 #define QUEUE_INDEX_SIZE_MASK(queue) ((queue)->index_size - 1)
324 
325 // Access the idx-th entry in the given demux_queue.
326 // Requirement: idx >= 0 && idx < queue->num_index
327 #define QUEUE_INDEX_ENTRY(queue, idx) \
328     ((queue)->index[((queue)->index0 + (idx)) & QUEUE_INDEX_SIZE_MASK(queue)])
329 
330 // Don't index packets whose timestamps that are within the last index entry by
331 // this amount of time (it's better to seek them manually).
332 #define INDEX_STEP_SIZE 1.0
333 
334 struct index_entry {
335     double pts;
336     struct demux_packet *pkt;
337 };
338 
339 // A continuous list of cached packets for a single stream/range. There is one
340 // for each stream and range. Also contains some state for use during demuxing
341 // (keeping it across seeks makes it easier to resume demuxing).
342 struct demux_queue {
343     struct demux_stream *ds;
344     struct demux_cached_range *range;
345 
346     struct demux_packet *head;
347     struct demux_packet *tail;
348 
349     uint64_t tail_cum_pos;  // cumulative size including tail packet
350 
351     bool correct_dts;       // packet DTS is strictly monotonically increasing
352     bool correct_pos;       // packet pos is strictly monotonically increasing
353     int64_t last_pos;       // for determining correct_pos
354     int64_t last_pos_fixup; // for filling in unset dp->pos values
355     double last_dts;        // for determining correct_dts
356     double last_ts;         // timestamp of the last packet added to queue
357 
358     // for incrementally determining seek PTS range
359     struct demux_packet *keyframe_latest;
360     struct demux_packet *keyframe_first; // cached value of first KF packet
361 
362     // incrementally maintained seek range, possibly invalid
363     double seek_start, seek_end;
364     double last_pruned;     // timestamp of last pruned keyframe
365 
366     bool is_bof;            // started demuxing at beginning of file
367     bool is_eof;            // received true EOF here
368 
369     // Complete index, though it may skip some entries to reduce density.
370     struct index_entry *index;  // ring buffer
371     size_t index_size;          // size of index[] (0 or a power of 2)
372     size_t index0;              // first index entry
373     size_t num_index;           // number of index entries (wraps on index_size)
374 };
375 
376 struct demux_stream {
377     struct demux_internal *in;
378     struct sh_stream *sh;   // ds->sh->ds == ds
379     enum stream_type type;  // equals to sh->type
380     int index;              // equals to sh->index
381     // --- all fields are protected by in->lock
382 
383     void (*wakeup_cb)(void *ctx);
384     void *wakeup_cb_ctx;
385 
386     // demuxer state
387     bool selected;          // user wants packets from this stream
388     bool eager;             // try to keep at least 1 packet queued
389                             // if false, this stream is disabled, or passively
390                             // read (like subtitles)
391     bool still_image;       // stream has still video images
392     bool refreshing;        // finding old position after track switches
393     bool eof;               // end of demuxed stream? (true if no more packets)
394 
395     bool global_correct_dts;// all observed so far
396     bool global_correct_pos;
397 
398     // current queue - used both for reading and demuxing (this is never NULL)
399     struct demux_queue *queue;
400 
401     // reader (decoder) state (bitrate calculations are part of it because we
402     // want to return the bitrate closest to the "current position")
403     double base_ts;         // timestamp of the last packet returned to decoder
404     double last_br_ts;      // timestamp of last packet bitrate was calculated
405     size_t last_br_bytes;   // summed packet sizes since last bitrate calculation
406     double bitrate;
407     struct demux_packet *reader_head;   // points at current decoder position
408     bool skip_to_keyframe;
409     bool attached_picture_added;
410     bool need_wakeup;       // call wakeup_cb on next reader_head state change
411     double force_read_until;// eager=false streams (subs): force read-ahead
412 
413     // For demux_internal.dumper. Currently, this is used only temporarily
414     // during blocking dumping.
415     struct demux_packet *dump_pos;
416 
417     // for refresh seeks: pos/dts of last packet returned to reader
418     int64_t last_ret_pos;
419     double last_ret_dts;
420 
421     // Backwards demuxing.
422     bool back_need_recheck; // flag for incremental find_backward_restart_pos work
423     // pos/dts of the previous keyframe packet returned; always valid if back-
424     // demuxing is enabled, and back_restart_eof/back_restart_next are false.
425     int64_t back_restart_pos;
426     double back_restart_dts;
427     bool back_restart_eof; // restart position is at EOF; overrides pos/dts
428     bool back_restart_next; // restart before next keyframe; overrides above
429     bool back_restarting;   // searching keyframe before restart pos
430     // Current PTS lower bound for back demuxing.
431     double back_seek_pos;
432     // pos/dts of the packet to resume demuxing from when another stream caused
433     // a seek backward to get more packets. reader_head will be reset to this
434     // packet as soon as it's encountered again.
435     int64_t back_resume_pos;
436     double back_resume_dts;
437     bool back_resuming;     // resuming mode (above fields are valid/used)
438     // Set to true if the first packet (keyframe) of a range was returned.
439     bool back_range_started;
440     // Number of KF packets at start of range yet to return. -1 is used for BOF.
441     int back_range_count;
442     // Number of KF packets yet to return that are marked as preroll.
443     int back_range_preroll;
444     // Static packet preroll count.
445     int back_preroll;
446 
447     // for closed captions (demuxer_feed_caption)
448     struct sh_stream *cc;
449     bool ignore_eof;        // ignore stream in underrun detection
450 };
451 
452 static void switch_to_fresh_cache_range(struct demux_internal *in);
453 static void demuxer_sort_chapters(demuxer_t *demuxer);
454 static void *demux_thread(void *pctx);
455 static void update_cache(struct demux_internal *in);
456 static void add_packet_locked(struct sh_stream *stream, demux_packet_t *dp);
457 static struct demux_packet *advance_reader_head(struct demux_stream *ds);
458 static bool queue_seek(struct demux_internal *in, double seek_pts, int flags,
459                        bool clear_back_state);
460 static struct demux_packet *compute_keyframe_times(struct demux_packet *pkt,
461                                                    double *out_kf_min,
462                                                    double *out_kf_max);
463 static void find_backward_restart_pos(struct demux_stream *ds);
464 static struct demux_packet *find_seek_target(struct demux_queue *queue,
465                                              double pts, int flags);
466 static void prune_old_packets(struct demux_internal *in);
467 static void dumper_close(struct demux_internal *in);
468 static void demux_convert_tags_charset(struct demuxer *demuxer);
469 
get_foward_buffered_bytes(struct demux_stream * ds)470 static uint64_t get_foward_buffered_bytes(struct demux_stream *ds)
471 {
472     if (!ds->reader_head)
473         return 0;
474     return ds->queue->tail_cum_pos - ds->reader_head->cum_pos;
475 }
476 
477 #if 0
478 // very expensive check for redundant cached queue state
479 static void check_queue_consistency(struct demux_internal *in)
480 {
481     uint64_t total_bytes = 0;
482 
483     assert(in->current_range && in->num_ranges > 0);
484     assert(in->current_range == in->ranges[in->num_ranges - 1]);
485 
486     for (int n = 0; n < in->num_ranges; n++) {
487         struct demux_cached_range *range = in->ranges[n];
488         int range_num_packets = 0;
489 
490         assert(range->num_streams == in->num_streams);
491 
492         for (int i = 0; i < range->num_streams; i++) {
493             struct demux_queue *queue = range->streams[i];
494 
495             assert(queue->range == range);
496 
497             size_t fw_bytes = 0;
498             bool is_forward = false;
499             bool kf_found = false;
500             bool kf1_found = false;
501             size_t next_index = 0;
502             uint64_t queue_total_bytes = 0;
503             for (struct demux_packet *dp = queue->head; dp; dp = dp->next) {
504                 is_forward |= dp == queue->ds->reader_head;
505                 kf_found |= dp == queue->keyframe_latest;
506                 kf1_found |= dp == queue->keyframe_first;
507 
508                 size_t bytes = demux_packet_estimate_total_size(dp);
509                 total_bytes += bytes;
510                 queue_total_bytes += bytes;
511                 if (is_forward) {
512                     fw_bytes += bytes;
513                     assert(range == in->current_range);
514                     assert(queue->ds->queue == queue);
515                 }
516                 range_num_packets += 1;
517 
518                 if (!dp->next)
519                     assert(queue->tail == dp);
520 
521                 if (next_index < queue->num_index &&
522                     QUEUE_INDEX_ENTRY(queue, next_index).pkt == dp)
523                     next_index += 1;
524             }
525             if (!queue->head)
526                 assert(!queue->tail);
527             assert(next_index == queue->num_index);
528 
529             uint64_t queue_total_bytes2 = 0;
530             if (queue->head)
531                 queue_total_bytes2 = queue->tail_cum_pos - queue->head->cum_pos;
532 
533             assert(queue_total_bytes == queue_total_bytes2);
534 
535             // If the queue is currently used...
536             if (queue->ds->queue == queue) {
537                 // ...reader_head and others must be in the queue.
538                 assert(is_forward == !!queue->ds->reader_head);
539                 assert(kf_found == !!queue->keyframe_latest);
540                 uint64_t fw_bytes2 = get_foward_buffered_bytes(queue->ds);
541                 assert(fw_bytes == fw_bytes2);
542             }
543 
544             assert(kf1_found == !!queue->keyframe_first);
545 
546             if (range != in->current_range) {
547                 assert(fw_bytes == 0);
548             }
549 
550             if (queue->keyframe_latest)
551                 assert(queue->keyframe_latest->keyframe);
552 
553             total_bytes += queue->index_size * sizeof(struct index_entry);
554         }
555 
556         // Invariant needed by pruning; violation has worse effects than just
557         // e.g. broken seeking due to incorrect seek ranges.
558         if (range->seek_start != MP_NOPTS_VALUE)
559             assert(range_num_packets > 0);
560     }
561 
562     assert(in->total_bytes == total_bytes);
563 }
564 #endif
565 
566 // (this doesn't do most required things for a switch, like updating ds->queue)
set_current_range(struct demux_internal * in,struct demux_cached_range * range)567 static void set_current_range(struct demux_internal *in,
568                               struct demux_cached_range *range)
569 {
570     in->current_range = range;
571 
572     // Move to in->ranges[in->num_ranges-1] (for LRU sorting/invariant)
573     for (int n = 0; n < in->num_ranges; n++) {
574         if (in->ranges[n] == range) {
575             MP_TARRAY_REMOVE_AT(in->ranges, in->num_ranges, n);
576             break;
577         }
578     }
579     MP_TARRAY_APPEND(in, in->ranges, in->num_ranges, range);
580 }
581 
prune_metadata(struct demux_cached_range * range)582 static void prune_metadata(struct demux_cached_range *range)
583 {
584     int first_needed = 0;
585 
586     if (range->seek_start == MP_NOPTS_VALUE) {
587         first_needed = range->num_metadata;
588     } else {
589         for (int n = 0; n < range->num_metadata ; n++) {
590             if (range->metadata[n]->pts > range->seek_start)
591                 break;
592             first_needed = n;
593         }
594     }
595 
596     // Always preserve the last entry.
597     first_needed = MPMIN(first_needed, range->num_metadata - 1);
598 
599     // (Could make this significantly more efficient for large first_needed,
600     // however that might be very rare and even then it might not matter.)
601     for (int n = 0; n < first_needed; n++) {
602         talloc_free(range->metadata[0]);
603         MP_TARRAY_REMOVE_AT(range->metadata, range->num_metadata, 0);
604     }
605 }
606 
607 // Refresh range->seek_start/end. Idempotent.
update_seek_ranges(struct demux_cached_range * range)608 static void update_seek_ranges(struct demux_cached_range *range)
609 {
610     range->seek_start = range->seek_end = MP_NOPTS_VALUE;
611     range->is_bof = true;
612     range->is_eof = true;
613 
614     double min_start_pts = MP_NOPTS_VALUE;
615     double max_end_pts = MP_NOPTS_VALUE;
616 
617     for (int n = 0; n < range->num_streams; n++) {
618         struct demux_queue *queue = range->streams[n];
619 
620         if (queue->ds->selected && queue->ds->eager) {
621             if (queue->is_bof) {
622                 min_start_pts = MP_PTS_MIN(min_start_pts, queue->seek_start);
623             } else {
624                 range->seek_start =
625                     MP_PTS_MAX(range->seek_start, queue->seek_start);
626             }
627 
628             if (queue->is_eof) {
629                 max_end_pts = MP_PTS_MAX(max_end_pts, queue->seek_end);
630             } else {
631                 range->seek_end = MP_PTS_MIN(range->seek_end, queue->seek_end);
632             }
633 
634             range->is_eof &= queue->is_eof;
635             range->is_bof &= queue->is_bof;
636 
637             bool empty = queue->is_eof && !queue->head;
638             if (queue->seek_start >= queue->seek_end && !empty &&
639                 !(queue->seek_start == queue->seek_end &&
640                   queue->seek_start != MP_NOPTS_VALUE))
641                 goto broken;
642         }
643     }
644 
645     if (range->is_eof)
646         range->seek_end = max_end_pts;
647     if (range->is_bof)
648         range->seek_start = min_start_pts;
649 
650     // Sparse (subtitle) stream behavior is not very clearly defined, but
651     // usually we don't want it to restrict the range of other streams. For
652     // example, if there are subtitle packets at position 5 and 10 seconds, and
653     // the demuxer demuxed the other streams until position 7 seconds, the seek
654     // range end position is 7.
655     // Assume that reading a non-sparse (audio/video) packet gets all sparse
656     // packets that are needed before that non-sparse packet.
657     // This is incorrect in any of these cases:
658     //  - sparse streams only (it's unknown how to determine an accurate range)
659     //  - if sparse streams have non-keyframe packets (we set queue->last_pruned
660     //    to the start of the pruned keyframe range - we'd need the end or so)
661     // We also assume that ds->eager equals to a stream not being sparse
662     // (usually true, except if only sparse streams are selected).
663     // We also rely on the fact that the demuxer position will always be ahead
664     // of the seek_end for audio/video, because they need to prefetch at least
665     // 1 packet to detect the end of a keyframe range. This means that there's
666     // a relatively high guarantee to have all sparse (subtitle) packets within
667     // the seekable range.
668     // As a consequence, the code _never_ checks queue->seek_end for a sparse
669     // queue, as the end of it is implied by the highest PTS of a non-sparse
670     // stream (i.e. the latest demuxer position).
671     // On the other hand, if a sparse packet was pruned, and that packet has
672     // a higher PTS than seek_start for non-sparse queues, that packet is
673     // missing. So the range's seek_start needs to be adjusted accordingly.
674     for (int n = 0; n < range->num_streams; n++) {
675         struct demux_queue *queue = range->streams[n];
676         if (queue->ds->selected && !queue->ds->eager &&
677             queue->last_pruned != MP_NOPTS_VALUE &&
678             range->seek_start != MP_NOPTS_VALUE)
679         {
680             // (last_pruned is _exclusive_ to the seekable range, so add a small
681             // value to exclude it from the valid range.)
682             range->seek_start =
683                 MP_PTS_MAX(range->seek_start, queue->last_pruned + 0.1);
684         }
685     }
686 
687     if (range->seek_start >= range->seek_end)
688         goto broken;
689 
690     prune_metadata(range);
691     return;
692 
693 broken:
694     range->seek_start = range->seek_end = MP_NOPTS_VALUE;
695     prune_metadata(range);
696 }
697 
698 // Remove queue->head from the queue.
remove_head_packet(struct demux_queue * queue)699 static void remove_head_packet(struct demux_queue *queue)
700 {
701     struct demux_packet *dp = queue->head;
702 
703     assert(queue->ds->reader_head != dp);
704     if (queue->keyframe_first == dp)
705         queue->keyframe_first = NULL;
706     if (queue->keyframe_latest == dp)
707         queue->keyframe_latest = NULL;
708     queue->is_bof = false;
709 
710     uint64_t end_pos = dp->next ? dp->next->cum_pos : queue->tail_cum_pos;
711     queue->ds->in->total_bytes -= end_pos - dp->cum_pos;
712 
713     if (queue->num_index && queue->index[queue->index0].pkt == dp) {
714         queue->index0 = (queue->index0 + 1) & QUEUE_INDEX_SIZE_MASK(queue);
715         queue->num_index -= 1;
716     }
717 
718     queue->head = dp->next;
719     if (!queue->head)
720         queue->tail = NULL;
721 
722     talloc_free(dp);
723 }
724 
free_index(struct demux_queue * queue)725 static void free_index(struct demux_queue *queue)
726 {
727     struct demux_stream *ds = queue->ds;
728     struct demux_internal *in = ds->in;
729 
730     in->total_bytes -= queue->index_size * sizeof(queue->index[0]);
731     queue->index_size = 0;
732     queue->index0 = 0;
733     queue->num_index = 0;
734     TA_FREEP(&queue->index);
735 }
736 
clear_queue(struct demux_queue * queue)737 static void clear_queue(struct demux_queue *queue)
738 {
739     struct demux_stream *ds = queue->ds;
740     struct demux_internal *in = ds->in;
741 
742     if (queue->head)
743         in->total_bytes -= queue->tail_cum_pos - queue->head->cum_pos;
744 
745     free_index(queue);
746 
747     struct demux_packet *dp = queue->head;
748     while (dp) {
749         struct demux_packet *dn = dp->next;
750         assert(ds->reader_head != dp);
751         talloc_free(dp);
752         dp = dn;
753     }
754     queue->head = queue->tail = NULL;
755     queue->keyframe_first = NULL;
756     queue->keyframe_latest = NULL;
757     queue->seek_start = queue->seek_end = queue->last_pruned = MP_NOPTS_VALUE;
758 
759     queue->correct_dts = queue->correct_pos = true;
760     queue->last_pos = -1;
761     queue->last_ts = queue->last_dts = MP_NOPTS_VALUE;
762     queue->last_pos_fixup = -1;
763 
764     queue->is_eof = false;
765     queue->is_bof = false;
766 }
767 
clear_cached_range(struct demux_internal * in,struct demux_cached_range * range)768 static void clear_cached_range(struct demux_internal *in,
769                                struct demux_cached_range *range)
770 {
771     for (int n = 0; n < range->num_streams; n++)
772         clear_queue(range->streams[n]);
773 
774     for (int n = 0; n < range->num_metadata; n++)
775         talloc_free(range->metadata[n]);
776     range->num_metadata = 0;
777 
778     update_seek_ranges(range);
779 }
780 
781 // Remove ranges with no data (except in->current_range). Also remove excessive
782 // ranges.
free_empty_cached_ranges(struct demux_internal * in)783 static void free_empty_cached_ranges(struct demux_internal *in)
784 {
785     while (1) {
786         struct demux_cached_range *worst = NULL;
787 
788         int end = in->num_ranges - 1;
789 
790         // (Not set during early init or late destruction.)
791         if (in->current_range) {
792             assert(in->current_range && in->num_ranges > 0);
793             assert(in->current_range == in->ranges[in->num_ranges - 1]);
794             end -= 1;
795         }
796 
797         for (int n = end; n >= 0; n--) {
798             struct demux_cached_range *range = in->ranges[n];
799             if (range->seek_start == MP_NOPTS_VALUE || !in->seekable_cache) {
800                 clear_cached_range(in, range);
801                 MP_TARRAY_REMOVE_AT(in->ranges, in->num_ranges, n);
802                 for (int i = 0; i < range->num_streams; i++)
803                     talloc_free(range->streams[i]);
804                 talloc_free(range);
805             } else {
806                 if (!worst || (range->seek_end - range->seek_start <
807                                worst->seek_end - worst->seek_start))
808                     worst = range;
809             }
810         }
811 
812         if (in->num_ranges <= MAX_SEEK_RANGES || !worst)
813             break;
814 
815         clear_cached_range(in, worst);
816     }
817 }
818 
ds_clear_reader_queue_state(struct demux_stream * ds)819 static void ds_clear_reader_queue_state(struct demux_stream *ds)
820 {
821     ds->reader_head = NULL;
822     ds->eof = false;
823     ds->need_wakeup = true;
824 }
825 
ds_clear_reader_state(struct demux_stream * ds,bool clear_back_state)826 static void ds_clear_reader_state(struct demux_stream *ds,
827                                   bool clear_back_state)
828 {
829     ds_clear_reader_queue_state(ds);
830 
831     ds->base_ts = ds->last_br_ts = MP_NOPTS_VALUE;
832     ds->last_br_bytes = 0;
833     ds->bitrate = -1;
834     ds->skip_to_keyframe = false;
835     ds->attached_picture_added = false;
836     ds->last_ret_pos = -1;
837     ds->last_ret_dts = MP_NOPTS_VALUE;
838     ds->force_read_until = MP_NOPTS_VALUE;
839 
840     if (clear_back_state) {
841         ds->back_restart_pos = -1;
842         ds->back_restart_dts = MP_NOPTS_VALUE;
843         ds->back_restart_eof = false;
844         ds->back_restart_next = ds->in->back_demuxing;
845         ds->back_restarting = ds->in->back_demuxing && ds->eager;
846         ds->back_seek_pos = MP_NOPTS_VALUE;
847         ds->back_resume_pos = -1;
848         ds->back_resume_dts = MP_NOPTS_VALUE;
849         ds->back_resuming = false;
850         ds->back_range_started = false;
851         ds->back_range_count = 0;
852         ds->back_range_preroll = 0;
853     }
854 }
855 
856 // called locked, from user thread only
clear_reader_state(struct demux_internal * in,bool clear_back_state)857 static void clear_reader_state(struct demux_internal *in,
858                                bool clear_back_state)
859 {
860     for (int n = 0; n < in->num_streams; n++)
861         ds_clear_reader_state(in->streams[n]->ds, clear_back_state);
862     in->warned_queue_overflow = false;
863     in->d_user->filepos = -1; // implicitly synchronized
864     in->blocked = false;
865     in->need_back_seek = false;
866 }
867 
868 // Call if the observed reader state on this stream somehow changes. The wakeup
869 // is skipped if the reader successfully read a packet, because that means we
870 // expect it to come back and ask for more.
wakeup_ds(struct demux_stream * ds)871 static void wakeup_ds(struct demux_stream *ds)
872 {
873     if (ds->need_wakeup) {
874         if (ds->wakeup_cb) {
875             ds->wakeup_cb(ds->wakeup_cb_ctx);
876         } else if (ds->in->wakeup_cb) {
877             ds->in->wakeup_cb(ds->in->wakeup_cb_ctx);
878         }
879         ds->need_wakeup = false;
880         pthread_cond_signal(&ds->in->wakeup);
881     }
882 }
883 
update_stream_selection_state(struct demux_internal * in,struct demux_stream * ds)884 static void update_stream_selection_state(struct demux_internal *in,
885                                           struct demux_stream *ds)
886 {
887     ds->eof = false;
888     ds->refreshing = false;
889 
890     // We still have to go over the whole stream list to update ds->eager for
891     // other streams too, because they depend on other stream's selections.
892 
893     bool any_av_streams = false;
894     bool any_streams = false;
895 
896     for (int n = 0; n < in->num_streams; n++) {
897         struct demux_stream *s = in->streams[n]->ds;
898 
899         s->still_image = s->sh->still_image;
900         s->eager = s->selected && !s->sh->attached_picture;
901         if (s->eager && !s->still_image)
902             any_av_streams |= s->type != STREAM_SUB;
903         any_streams |= s->selected;
904     }
905 
906     // Subtitles are only eagerly read if there are no other eagerly read
907     // streams.
908     if (any_av_streams) {
909         for (int n = 0; n < in->num_streams; n++) {
910             struct demux_stream *s = in->streams[n]->ds;
911 
912             if (s->type == STREAM_SUB)
913                 s->eager = false;
914         }
915     }
916 
917     if (!any_streams)
918         in->blocked = false;
919 
920     ds_clear_reader_state(ds, true);
921 
922     // Make sure any stream reselection or addition is reflected in the seek
923     // ranges, and also get rid of data that is not needed anymore (or
924     // rather, which can't be kept consistent). This has to happen after we've
925     // updated all the subtle state (like s->eager).
926     for (int n = 0; n < in->num_ranges; n++) {
927         struct demux_cached_range *range = in->ranges[n];
928 
929         if (!ds->selected)
930             clear_queue(range->streams[ds->index]);
931 
932         update_seek_ranges(range);
933     }
934 
935     free_empty_cached_ranges(in);
936 
937     wakeup_ds(ds);
938 }
939 
demux_set_ts_offset(struct demuxer * demuxer,double offset)940 void demux_set_ts_offset(struct demuxer *demuxer, double offset)
941 {
942     struct demux_internal *in = demuxer->in;
943     pthread_mutex_lock(&in->lock);
944     in->ts_offset = offset;
945     pthread_mutex_unlock(&in->lock);
946 }
947 
add_missing_streams(struct demux_internal * in,struct demux_cached_range * range)948 static void add_missing_streams(struct demux_internal *in,
949                                 struct demux_cached_range *range)
950 {
951     for (int n = range->num_streams; n < in->num_streams; n++) {
952         struct demux_stream *ds = in->streams[n]->ds;
953 
954         struct demux_queue *queue = talloc_ptrtype(NULL, queue);
955         *queue = (struct demux_queue){
956             .ds = ds,
957             .range = range,
958         };
959         clear_queue(queue);
960         MP_TARRAY_APPEND(range, range->streams, range->num_streams, queue);
961         assert(range->streams[ds->index] == queue);
962     }
963 }
964 
965 // Allocate a new sh_stream of the given type. It either has to be released
966 // with talloc_free(), or added to a demuxer with demux_add_sh_stream(). You
967 // cannot add or read packets from the stream before it has been added.
968 // type may be changed later, but only before demux_add_sh_stream().
demux_alloc_sh_stream(enum stream_type type)969 struct sh_stream *demux_alloc_sh_stream(enum stream_type type)
970 {
971     struct sh_stream *sh = talloc_ptrtype(NULL, sh);
972     *sh = (struct sh_stream) {
973         .type = type,
974         .index = -1,
975         .ff_index = -1,     // may be overwritten by demuxer
976         .demuxer_id = -1,   // ... same
977         .codec = talloc_zero(sh, struct mp_codec_params),
978         .tags = talloc_zero(sh, struct mp_tags),
979     };
980     sh->codec->type = type;
981     return sh;
982 }
983 
984 // Add a new sh_stream to the demuxer. Note that as soon as the stream has been
985 // added, it must be immutable, and must not be released (this will happen when
986 // the demuxer is destroyed).
demux_add_sh_stream_locked(struct demux_internal * in,struct sh_stream * sh)987 static void demux_add_sh_stream_locked(struct demux_internal *in,
988                                        struct sh_stream *sh)
989 {
990     assert(!sh->ds); // must not be added yet
991 
992     sh->index = in->num_streams;
993 
994     sh->ds = talloc(sh, struct demux_stream);
995     *sh->ds = (struct demux_stream) {
996         .in = in,
997         .sh = sh,
998         .type = sh->type,
999         .index = sh->index,
1000         .global_correct_dts = true,
1001         .global_correct_pos = true,
1002     };
1003 
1004     struct demux_stream *ds = sh->ds;
1005 
1006     if (!sh->codec->codec)
1007         sh->codec->codec = "";
1008 
1009     if (sh->ff_index < 0)
1010         sh->ff_index = sh->index;
1011 
1012     MP_TARRAY_APPEND(in, in->streams, in->num_streams, sh);
1013     assert(in->streams[sh->index] == sh);
1014 
1015     if (in->current_range) {
1016         for (int n = 0; n < in->num_ranges; n++)
1017             add_missing_streams(in, in->ranges[n]);
1018 
1019         sh->ds->queue = in->current_range->streams[sh->ds->index];
1020     }
1021 
1022     update_stream_selection_state(in, sh->ds);
1023 
1024     switch (ds->type) {
1025     case STREAM_AUDIO:
1026         ds->back_preroll = in->opts->audio_back_preroll;
1027         if (ds->back_preroll < 0) { // auto
1028             ds->back_preroll = mp_codec_is_lossless(sh->codec->codec) ? 0 : 1;
1029             if (sh->codec->codec && (strcmp(sh->codec->codec, "opus") == 0 ||
1030                                      strcmp(sh->codec->codec, "vorbis") == 0 ||
1031                                      strcmp(sh->codec->codec, "mp3") == 0))
1032                 ds->back_preroll = 2;
1033         }
1034         break;
1035     case STREAM_VIDEO:
1036         ds->back_preroll = in->opts->video_back_preroll;
1037         if (ds->back_preroll < 0)
1038             ds->back_preroll = 0; // auto
1039         break;
1040     }
1041 
1042     if (!ds->sh->attached_picture) {
1043         // Typically this is used for webradio, so any stream will do.
1044         if (!in->metadata_stream)
1045             in->metadata_stream = sh;
1046     }
1047 
1048     in->events |= DEMUX_EVENT_STREAMS;
1049     if (in->wakeup_cb)
1050         in->wakeup_cb(in->wakeup_cb_ctx);
1051 }
1052 
1053 // For demuxer implementations only.
demux_add_sh_stream(struct demuxer * demuxer,struct sh_stream * sh)1054 void demux_add_sh_stream(struct demuxer *demuxer, struct sh_stream *sh)
1055 {
1056     struct demux_internal *in = demuxer->in;
1057     assert(demuxer == in->d_thread);
1058     pthread_mutex_lock(&in->lock);
1059     demux_add_sh_stream_locked(in, sh);
1060     pthread_mutex_unlock(&in->lock);
1061 }
1062 
1063 // Return a stream with the given index. Since streams can only be added during
1064 // the lifetime of the demuxer, it is guaranteed that an index within the valid
1065 // range [0, demux_get_num_stream()) always returns a valid sh_stream pointer,
1066 // which will be valid until the demuxer is destroyed.
demux_get_stream(struct demuxer * demuxer,int index)1067 struct sh_stream *demux_get_stream(struct demuxer *demuxer, int index)
1068 {
1069     struct demux_internal *in = demuxer->in;
1070     pthread_mutex_lock(&in->lock);
1071     assert(index >= 0 && index < in->num_streams);
1072     struct sh_stream *r = in->streams[index];
1073     pthread_mutex_unlock(&in->lock);
1074     return r;
1075 }
1076 
1077 // See demux_get_stream().
demux_get_num_stream(struct demuxer * demuxer)1078 int demux_get_num_stream(struct demuxer *demuxer)
1079 {
1080     struct demux_internal *in = demuxer->in;
1081     pthread_mutex_lock(&in->lock);
1082     int r = in->num_streams;
1083     pthread_mutex_unlock(&in->lock);
1084     return r;
1085 }
1086 
1087 // It's UB to call anything but demux_dealloc() on the demuxer after this.
demux_shutdown(struct demux_internal * in)1088 static void demux_shutdown(struct demux_internal *in)
1089 {
1090     struct demuxer *demuxer = in->d_user;
1091 
1092     if (in->recorder) {
1093         mp_recorder_destroy(in->recorder);
1094         in->recorder = NULL;
1095     }
1096 
1097     dumper_close(in);
1098 
1099     if (demuxer->desc->close)
1100         demuxer->desc->close(in->d_thread);
1101     demuxer->priv = NULL;
1102     in->d_thread->priv = NULL;
1103 
1104     demux_flush(demuxer);
1105     assert(in->total_bytes == 0);
1106 
1107     in->current_range = NULL;
1108     free_empty_cached_ranges(in);
1109 
1110     talloc_free(in->cache);
1111     in->cache = NULL;
1112 
1113     if (in->owns_stream)
1114         free_stream(demuxer->stream);
1115     demuxer->stream = NULL;
1116 }
1117 
demux_dealloc(struct demux_internal * in)1118 static void demux_dealloc(struct demux_internal *in)
1119 {
1120     for (int n = 0; n < in->num_streams; n++)
1121         talloc_free(in->streams[n]);
1122     pthread_mutex_destroy(&in->lock);
1123     pthread_cond_destroy(&in->wakeup);
1124     talloc_free(in->d_user);
1125 }
1126 
demux_free(struct demuxer * demuxer)1127 void demux_free(struct demuxer *demuxer)
1128 {
1129     if (!demuxer)
1130         return;
1131     struct demux_internal *in = demuxer->in;
1132     assert(demuxer == in->d_user);
1133 
1134     demux_stop_thread(demuxer);
1135     demux_shutdown(in);
1136     demux_dealloc(in);
1137 }
1138 
1139 // Start closing the demuxer and eventually freeing the demuxer asynchronously.
1140 // You must not access the demuxer once this has been started. Once the demuxer
1141 // is shutdown, the wakeup callback is invoked. Then you need to call
1142 // demux_free_async_finish() to end the operation (it must not be called from
1143 // the wakeup callback).
1144 // This can return NULL. Then the demuxer cannot be free'd asynchronously, and
1145 // you need to call demux_free() instead.
demux_free_async(struct demuxer * demuxer)1146 struct demux_free_async_state *demux_free_async(struct demuxer *demuxer)
1147 {
1148     struct demux_internal *in = demuxer->in;
1149     assert(demuxer == in->d_user);
1150 
1151     if (!in->threading)
1152         return NULL;
1153 
1154     pthread_mutex_lock(&in->lock);
1155     in->thread_terminate = true;
1156     in->shutdown_async = true;
1157     pthread_cond_signal(&in->wakeup);
1158     pthread_mutex_unlock(&in->lock);
1159 
1160     return (struct demux_free_async_state *)demuxer->in; // lies
1161 }
1162 
1163 // As long as state is valid, you can call this to request immediate abort.
1164 // Roughly behaves as demux_cancel_and_free(), except you still need to wait
1165 // for the result.
demux_free_async_force(struct demux_free_async_state * state)1166 void demux_free_async_force(struct demux_free_async_state *state)
1167 {
1168     struct demux_internal *in = (struct demux_internal *)state; // reverse lies
1169 
1170     mp_cancel_trigger(in->d_user->cancel);
1171 }
1172 
1173 // Check whether the demuxer is shutdown yet. If not, return false, and you
1174 // need to call this again in the future (preferably after you were notified by
1175 // the wakeup callback). If yes, deallocate all state, and return true (in
1176 // particular, the state ptr becomes invalid, and the wakeup callback will never
1177 // be called again).
demux_free_async_finish(struct demux_free_async_state * state)1178 bool demux_free_async_finish(struct demux_free_async_state *state)
1179 {
1180     struct demux_internal *in = (struct demux_internal *)state; // reverse lies
1181 
1182     pthread_mutex_lock(&in->lock);
1183     bool busy = in->shutdown_async;
1184     pthread_mutex_unlock(&in->lock);
1185 
1186     if (busy)
1187         return false;
1188 
1189     demux_stop_thread(in->d_user);
1190     demux_dealloc(in);
1191     return true;
1192 }
1193 
1194 // Like demux_free(), but trigger an abort, which will force the demuxer to
1195 // terminate immediately. If this wasn't opened with demux_open_url(), there is
1196 // some chance this will accidentally abort other things via demuxer->cancel.
demux_cancel_and_free(struct demuxer * demuxer)1197 void demux_cancel_and_free(struct demuxer *demuxer)
1198 {
1199     if (!demuxer)
1200         return;
1201     mp_cancel_trigger(demuxer->cancel);
1202     demux_free(demuxer);
1203 }
1204 
1205 // Start the demuxer thread, which reads ahead packets on its own.
demux_start_thread(struct demuxer * demuxer)1206 void demux_start_thread(struct demuxer *demuxer)
1207 {
1208     struct demux_internal *in = demuxer->in;
1209     assert(demuxer == in->d_user);
1210 
1211     if (!in->threading) {
1212         in->threading = true;
1213         if (pthread_create(&in->thread, NULL, demux_thread, in))
1214             in->threading = false;
1215     }
1216 }
1217 
demux_stop_thread(struct demuxer * demuxer)1218 void demux_stop_thread(struct demuxer *demuxer)
1219 {
1220     struct demux_internal *in = demuxer->in;
1221     assert(demuxer == in->d_user);
1222 
1223     if (in->threading) {
1224         pthread_mutex_lock(&in->lock);
1225         in->thread_terminate = true;
1226         pthread_cond_signal(&in->wakeup);
1227         pthread_mutex_unlock(&in->lock);
1228         pthread_join(in->thread, NULL);
1229         in->threading = false;
1230         in->thread_terminate = false;
1231     }
1232 }
1233 
1234 // The demuxer thread will call cb(ctx) if there's a new packet, or EOF is reached.
demux_set_wakeup_cb(struct demuxer * demuxer,void (* cb)(void * ctx),void * ctx)1235 void demux_set_wakeup_cb(struct demuxer *demuxer, void (*cb)(void *ctx), void *ctx)
1236 {
1237     struct demux_internal *in = demuxer->in;
1238     pthread_mutex_lock(&in->lock);
1239     in->wakeup_cb = cb;
1240     in->wakeup_cb_ctx = ctx;
1241     pthread_mutex_unlock(&in->lock);
1242 }
1243 
demux_start_prefetch(struct demuxer * demuxer)1244 void demux_start_prefetch(struct demuxer *demuxer)
1245 {
1246     struct demux_internal *in = demuxer->in;
1247     assert(demuxer == in->d_user);
1248 
1249     pthread_mutex_lock(&in->lock);
1250     in->reading = true;
1251     pthread_cond_signal(&in->wakeup);
1252     pthread_mutex_unlock(&in->lock);
1253 }
1254 
stream_type_name(enum stream_type type)1255 const char *stream_type_name(enum stream_type type)
1256 {
1257     switch (type) {
1258     case STREAM_VIDEO:  return "video";
1259     case STREAM_AUDIO:  return "audio";
1260     case STREAM_SUB:    return "sub";
1261     default:            return "unknown";
1262     }
1263 }
1264 
demuxer_get_cc_track_locked(struct sh_stream * stream)1265 static struct sh_stream *demuxer_get_cc_track_locked(struct sh_stream *stream)
1266 {
1267     struct sh_stream *sh = stream->ds->cc;
1268 
1269     if (!sh) {
1270         sh = demux_alloc_sh_stream(STREAM_SUB);
1271         if (!sh)
1272             return NULL;
1273         sh->codec->codec = "eia_608";
1274         sh->default_track = true;
1275         stream->ds->cc = sh;
1276         demux_add_sh_stream_locked(stream->ds->in, sh);
1277         sh->ds->ignore_eof = true;
1278     }
1279 
1280     return sh;
1281 }
1282 
demuxer_feed_caption(struct sh_stream * stream,demux_packet_t * dp)1283 void demuxer_feed_caption(struct sh_stream *stream, demux_packet_t *dp)
1284 {
1285     struct demux_internal *in = stream->ds->in;
1286 
1287     pthread_mutex_lock(&in->lock);
1288     struct sh_stream *sh = demuxer_get_cc_track_locked(stream);
1289     if (!sh) {
1290         pthread_mutex_unlock(&in->lock);
1291         talloc_free(dp);
1292         return;
1293     }
1294 
1295     dp->keyframe = true;
1296     dp->pts = MP_ADD_PTS(dp->pts, -in->ts_offset);
1297     dp->dts = MP_ADD_PTS(dp->dts, -in->ts_offset);
1298     dp->stream = sh->index;
1299     add_packet_locked(sh, dp);
1300     pthread_mutex_unlock(&in->lock);
1301 }
1302 
error_on_backward_demuxing(struct demux_internal * in)1303 static void error_on_backward_demuxing(struct demux_internal *in)
1304 {
1305     if (!in->back_demuxing)
1306         return;
1307     MP_ERR(in, "Disabling backward demuxing.\n");
1308     in->back_demuxing = false;
1309     clear_reader_state(in, true);
1310 }
1311 
perform_backward_seek(struct demux_internal * in)1312 static void perform_backward_seek(struct demux_internal *in)
1313 {
1314     double target = MP_NOPTS_VALUE;
1315 
1316     for (int n = 0; n < in->num_streams; n++) {
1317         struct demux_stream *ds = in->streams[n]->ds;
1318 
1319         if (ds->reader_head && !ds->back_restarting && !ds->back_resuming &&
1320             ds->eager)
1321         {
1322             ds->back_resuming = true;
1323             ds->back_resume_pos = ds->reader_head->pos;
1324             ds->back_resume_dts = ds->reader_head->dts;
1325         }
1326 
1327         target = MP_PTS_MIN(target, ds->back_seek_pos);
1328     }
1329 
1330     target = MP_PTS_OR_DEF(target, in->d_thread->start_time);
1331 
1332     MP_VERBOSE(in, "triggering backward seek to get more packets\n");
1333     queue_seek(in, target, SEEK_SATAN | SEEK_HR, false);
1334     in->reading = true;
1335 
1336     // Don't starve other threads.
1337     pthread_mutex_unlock(&in->lock);
1338     pthread_mutex_lock(&in->lock);
1339 }
1340 
1341 // For incremental backward demuxing search work.
check_backward_seek(struct demux_internal * in)1342 static void check_backward_seek(struct demux_internal *in)
1343 {
1344     in->back_any_need_recheck = false;
1345 
1346     for (int n = 0; n < in->num_streams; n++) {
1347         struct demux_stream *ds = in->streams[n]->ds;
1348 
1349         if (ds->back_need_recheck)
1350             find_backward_restart_pos(ds);
1351     }
1352 }
1353 
1354 // Search for a packet to resume demuxing from.
1355 // The implementation of this function is quite awkward, because the packet
1356 // queue is a singly linked list without back links, while it needs to search
1357 // backwards.
1358 // This is the core of backward demuxing.
find_backward_restart_pos(struct demux_stream * ds)1359 static void find_backward_restart_pos(struct demux_stream *ds)
1360 {
1361     struct demux_internal *in = ds->in;
1362 
1363     ds->back_need_recheck = false;
1364     if (!ds->back_restarting)
1365         return;
1366 
1367     struct demux_packet *first = ds->reader_head;
1368     struct demux_packet *last = ds->queue->tail;
1369 
1370     if (first && !first->keyframe)
1371         MP_WARN(in, "Queue not starting on keyframe.\n");
1372 
1373     // Packet at back_restart_pos. (Note: we don't actually need it, only the
1374     // packet immediately before it. But same effort.)
1375     // If this is NULL, look for EOF (resume from very last keyframe).
1376     struct demux_packet *back_restart = NULL;
1377 
1378     if (ds->back_restart_next) {
1379         // Initial state. Switch to one of the other modi.
1380 
1381         for (struct demux_packet *cur = first; cur; cur = cur->next) {
1382             // Restart for next keyframe after reader_head.
1383             if (cur != first && cur->keyframe) {
1384                 ds->back_restart_dts = cur->dts;
1385                 ds->back_restart_pos = cur->pos;
1386                 ds->back_restart_eof = false;
1387                 ds->back_restart_next = false;
1388                 break;
1389             }
1390         }
1391 
1392         if (ds->back_restart_next && ds->eof) {
1393             // Restart from end if nothing was found.
1394             ds->back_restart_eof = true;
1395             ds->back_restart_next = false;
1396         }
1397 
1398         if (ds->back_restart_next)
1399             return;
1400     }
1401 
1402     if (ds->back_restart_eof) {
1403         // We're trying to find EOF (without discarding packets). Only continue
1404         // if we really reach EOF.
1405         if (!ds->eof)
1406             return;
1407     } else if (!first && ds->eof) {
1408         // Reached EOF during normal backward demuxing. We probably returned the
1409         // last keyframe range to user. Need to resume at an earlier position.
1410         // Fall through, hit the no-keyframe case (and possibly the BOF check
1411         // if there are no packets at all), and then resume_earlier.
1412     } else if (!first) {
1413         return; // no packets yet
1414     } else {
1415         assert(last);
1416 
1417         if ((ds->global_correct_dts && last->dts < ds->back_restart_dts) ||
1418             (ds->global_correct_pos && last->pos < ds->back_restart_pos))
1419             return; // restart pos not reached yet
1420 
1421         // The target we're searching for is apparently before the start of the
1422         // queue.
1423         if ((ds->global_correct_dts && first->dts > ds->back_restart_dts) ||
1424             (ds->global_correct_pos && first->pos > ds->back_restart_pos))
1425             goto resume_earlier; // current position is too late; seek back
1426 
1427 
1428         for (struct demux_packet *cur = first; cur; cur = cur->next) {
1429             if ((ds->global_correct_dts && cur->dts == ds->back_restart_dts) ||
1430                 (ds->global_correct_pos && cur->pos == ds->back_restart_pos))
1431             {
1432                 back_restart = cur;
1433                 break;
1434             }
1435         }
1436 
1437         if (!back_restart) {
1438             // The packet should have been in the searched range; maybe dts/pos
1439             // determinism assumptions were broken.
1440             MP_ERR(in, "Demuxer not cooperating.\n");
1441             error_on_backward_demuxing(in);
1442             return;
1443         }
1444     }
1445 
1446     // Find where to restart demuxing. It's usually the last keyframe packet
1447     // before restart_pos, but might be up to back_preroll + batch keyframe
1448     // packets earlier.
1449 
1450     // (Normally, we'd just iterate backwards, but no back links.)
1451     int num_kf = 0;
1452     struct demux_packet *pre_1 = NULL; // idiotic "optimization" for total=1
1453     for (struct demux_packet *dp = first; dp != back_restart; dp = dp->next) {
1454         if (dp->keyframe) {
1455             num_kf++;
1456             pre_1 = dp;
1457         }
1458     }
1459 
1460     // Number of renderable keyframes to return to user.
1461     // (Excludes preroll, which is decoded by user, but then discarded.)
1462     int batch = MPMAX(in->opts->back_batch[ds->type], 1);
1463     // Number of keyframes to return to the user in total.
1464     int total = batch + ds->back_preroll;
1465 
1466     assert(total >= 1);
1467 
1468     bool is_bof = ds->queue->is_bof &&
1469         (first == ds->queue->head || ds->back_seek_pos < ds->queue->seek_start);
1470 
1471     struct demux_packet *target = NULL; // resume pos
1472     // nr. of keyframes, incl. target, excl. restart_pos
1473     int got_total = num_kf < total && is_bof ? num_kf : total;
1474     int got_preroll = MPMAX(got_total - batch, 0);
1475 
1476     if (got_total == 1) {
1477         target = pre_1;
1478     } else if (got_total <= num_kf) {
1479         int cur_kf = 0;
1480         for (struct demux_packet *dp = first; dp != back_restart; dp = dp->next) {
1481             if (dp->keyframe) {
1482                 if (num_kf - cur_kf == got_total) {
1483                     target = dp;
1484                     break;
1485                 }
1486                 cur_kf++;
1487             }
1488         }
1489     }
1490 
1491     if (!target) {
1492         if (is_bof) {
1493             MP_VERBOSE(in, "BOF for stream %d\n", ds->index);
1494             ds->back_restarting = false;
1495             ds->back_range_started = false;
1496             ds->back_range_count = -1;
1497             ds->back_range_preroll = 0;
1498             ds->need_wakeup = true;
1499             wakeup_ds(ds);
1500             return;
1501         }
1502         goto resume_earlier;
1503     }
1504 
1505     // Skip reader_head from previous keyframe to current one.
1506     // Or if preroll is involved, the first preroll packet.
1507     while (ds->reader_head != target) {
1508         if (!advance_reader_head(ds))
1509             assert(0); // target must be in list
1510     }
1511 
1512     double seek_pts;
1513     compute_keyframe_times(target, &seek_pts, NULL);
1514     if (seek_pts != MP_NOPTS_VALUE)
1515         ds->back_seek_pos = seek_pts;
1516 
1517     // For next backward adjust action.
1518     struct demux_packet *restart_pkt = NULL;
1519     int kf_pos = 0;
1520     for (struct demux_packet *dp = target; dp; dp = dp->next) {
1521         if (dp->keyframe) {
1522             if (kf_pos == got_preroll) {
1523                 restart_pkt = dp;
1524                 break;
1525             }
1526             kf_pos++;
1527         }
1528     }
1529     assert(restart_pkt);
1530     ds->back_restart_dts = restart_pkt->dts;
1531     ds->back_restart_pos = restart_pkt->pos;
1532 
1533     ds->back_restarting = false;
1534     ds->back_range_started = false;
1535     ds->back_range_count = got_total;
1536     ds->back_range_preroll = got_preroll;
1537     ds->need_wakeup = true;
1538     wakeup_ds(ds);
1539     return;
1540 
1541 resume_earlier:
1542     // We want to seek back to get earlier packets. But before we do this, we
1543     // must be sure that other streams have initialized their state. The only
1544     // time when this state is not initialized is right after the seek that
1545     // started backward demuxing (not any subsequent backstep seek). If this
1546     // initialization is omitted, the stream would try to start demuxing from
1547     // the "current" position. If another stream backstepped before that, the
1548     // other stream will miss the original seek target, and start playback from
1549     // a position that is too early.
1550     for (int n = 0; n < in->num_streams; n++) {
1551         struct demux_stream *ds2 = in->streams[n]->ds;
1552         if (ds2 == ds || !ds2->eager)
1553             continue;
1554 
1555         if (ds2->back_restarting && ds2->back_restart_next) {
1556             MP_VERBOSE(in, "delaying stream %d for %d\n", ds->index, ds2->index);
1557             return;
1558         }
1559     }
1560 
1561     if (ds->back_seek_pos != MP_NOPTS_VALUE) {
1562         struct demux_packet *t =
1563             find_seek_target(ds->queue, ds->back_seek_pos - 0.001, 0);
1564         if (t && t != ds->reader_head) {
1565             double pts;
1566             compute_keyframe_times(t, &pts, NULL);
1567             ds->back_seek_pos = MP_PTS_MIN(ds->back_seek_pos, pts);
1568             ds_clear_reader_state(ds, false);
1569             ds->reader_head = t;
1570             ds->back_need_recheck = true;
1571             in->back_any_need_recheck = true;
1572             pthread_cond_signal(&in->wakeup);
1573         } else {
1574             ds->back_seek_pos -= in->opts->back_seek_size;
1575             in->need_back_seek = true;
1576         }
1577     }
1578 }
1579 
1580 // Process that one or multiple packets were added.
back_demux_see_packets(struct demux_stream * ds)1581 static void back_demux_see_packets(struct demux_stream *ds)
1582 {
1583     struct demux_internal *in = ds->in;
1584 
1585     if (!ds->selected || !in->back_demuxing || !ds->eager)
1586         return;
1587 
1588     assert(!(ds->back_resuming && ds->back_restarting));
1589 
1590     if (!ds->global_correct_dts && !ds->global_correct_pos) {
1591         MP_ERR(in, "Can't demux backward due to demuxer problems.\n");
1592         error_on_backward_demuxing(in);
1593         return;
1594     }
1595 
1596     while (ds->back_resuming && ds->reader_head) {
1597         struct demux_packet *head = ds->reader_head;
1598         if ((ds->global_correct_dts && head->dts == ds->back_resume_dts) ||
1599             (ds->global_correct_pos && head->pos == ds->back_resume_pos))
1600         {
1601             ds->back_resuming = false;
1602             ds->need_wakeup = true;
1603             wakeup_ds(ds); // probably
1604             break;
1605         }
1606         advance_reader_head(ds);
1607     }
1608 
1609     if (ds->back_restarting)
1610         find_backward_restart_pos(ds);
1611 }
1612 
1613 // Add the keyframe to the end of the index. Not all packets are actually added.
add_index_entry(struct demux_queue * queue,struct demux_packet * dp,double pts)1614 static void add_index_entry(struct demux_queue *queue, struct demux_packet *dp,
1615                             double pts)
1616 {
1617     struct demux_internal *in = queue->ds->in;
1618 
1619     assert(dp->keyframe && pts != MP_NOPTS_VALUE);
1620 
1621     if (queue->num_index > 0) {
1622         struct index_entry *last = &QUEUE_INDEX_ENTRY(queue, queue->num_index - 1);
1623         if (pts - last->pts < INDEX_STEP_SIZE)
1624             return;
1625     }
1626 
1627     if (queue->num_index == queue->index_size) {
1628         // Needs to honor power-of-2 requirement.
1629         size_t new_size = MPMAX(128, queue->index_size * 2);
1630         assert(!(new_size & (new_size - 1)));
1631         MP_DBG(in, "stream %d: resize index to %zu\n", queue->ds->index,
1632                new_size);
1633         // Note: we could tolerate allocation failure, and just discard the
1634         // entire index (and prevent the index from being recreated).
1635         MP_RESIZE_ARRAY(NULL, queue->index, new_size);
1636         size_t highest_index = queue->index0 + queue->num_index;
1637         for (size_t n = queue->index_size; n < highest_index; n++)
1638             queue->index[n] = queue->index[n - queue->index_size];
1639         in->total_bytes +=
1640             (new_size - queue->index_size) * sizeof(queue->index[0]);
1641         queue->index_size = new_size;
1642     }
1643 
1644     assert(queue->num_index < queue->index_size);
1645 
1646     queue->num_index += 1;
1647 
1648     QUEUE_INDEX_ENTRY(queue, queue->num_index - 1) = (struct index_entry){
1649         .pts = pts,
1650         .pkt = dp,
1651     };
1652 }
1653 
1654 // Check whether the next range in the list is, and if it appears to overlap,
1655 // try joining it into a single range.
attempt_range_joining(struct demux_internal * in)1656 static void attempt_range_joining(struct demux_internal *in)
1657 {
1658     struct demux_cached_range *current = in->current_range;
1659     struct demux_cached_range *next = NULL;
1660     double next_dist = INFINITY;
1661 
1662     assert(current && in->num_ranges > 0);
1663     assert(current == in->ranges[in->num_ranges - 1]);
1664 
1665     for (int n = 0; n < in->num_ranges - 1; n++) {
1666         struct demux_cached_range *range = in->ranges[n];
1667 
1668         if (current->seek_start <= range->seek_start) {
1669             // This uses ">" to get some non-0 overlap.
1670             double dist = current->seek_end - range->seek_start;
1671             if (dist > 0 && dist < next_dist) {
1672                 next = range;
1673                 next_dist = dist;
1674             }
1675         }
1676     }
1677 
1678     if (!next)
1679         return;
1680 
1681     MP_VERBOSE(in, "going to join ranges %f-%f + %f-%f\n",
1682                current->seek_start, current->seek_end,
1683                next->seek_start, next->seek_end);
1684 
1685     // Try to find a join point, where packets obviously overlap. (It would be
1686     // better and faster to do this incrementally, but probably too complex.)
1687     // The current range can overlap arbitrarily with the next one, not only by
1688     // by the seek overlap, but for arbitrary packet readahead as well.
1689     // We also drop the overlapping packets (if joining fails, we discard the
1690     // entire next range anyway, so this does no harm).
1691     for (int n = 0; n < in->num_streams; n++) {
1692         struct demux_stream *ds = in->streams[n]->ds;
1693 
1694         struct demux_queue *q1 = current->streams[n];
1695         struct demux_queue *q2 = next->streams[n];
1696 
1697         if (!ds->global_correct_pos && !ds->global_correct_dts) {
1698             MP_WARN(in, "stream %d: ranges unjoinable\n", n);
1699             goto failed;
1700         }
1701 
1702         struct demux_packet *end = q1->tail;
1703         bool join_point_found = !end; // no packets yet -> joining will work
1704         if (end) {
1705             while (q2->head) {
1706                 struct demux_packet *dp = q2->head;
1707 
1708                 // Some weird corner-case. We'd have to search the equivalent
1709                 // packet in q1 to update it correctly. Better just give up.
1710                 if (dp == q2->keyframe_latest) {
1711                     MP_VERBOSE(in, "stream %d: not enough keyframes for join\n", n);
1712                     goto failed;
1713                 }
1714 
1715                 if ((ds->global_correct_dts && dp->dts == end->dts) ||
1716                     (ds->global_correct_pos && dp->pos == end->pos))
1717                 {
1718                     // Do some additional checks as a (imperfect) sanity check
1719                     // in case pos/dts are not "correct" across the ranges (we
1720                     // never actually check that).
1721                     if (dp->dts != end->dts || dp->pos != end->pos ||
1722                         dp->pts != end->pts)
1723                     {
1724                         MP_WARN(in,
1725                             "stream %d: non-repeatable demuxer behavior\n", n);
1726                         goto failed;
1727                     }
1728 
1729                     remove_head_packet(q2);
1730                     join_point_found = true;
1731                     break;
1732                 }
1733 
1734                 // This happens if the next range misses the end packet. For
1735                 // normal streams (ds->eager==true), this is a failure to find
1736                 // an overlap. For subtitles, this can mean the current_range
1737                 // has a subtitle somewhere before the end of its range, and
1738                 // next has another subtitle somewhere after the start of its
1739                 // range.
1740                 if ((ds->global_correct_dts && dp->dts > end->dts) ||
1741                     (ds->global_correct_pos && dp->pos > end->pos))
1742                     break;
1743 
1744                 remove_head_packet(q2);
1745             }
1746         }
1747 
1748         // For enabled non-sparse streams, always require an overlap packet.
1749         if (ds->eager && !join_point_found) {
1750             MP_WARN(in, "stream %d: no join point found\n", n);
1751             goto failed;
1752         }
1753     }
1754 
1755     // Actually join the ranges. Now that we think it will work, mutate the
1756     // data associated with the current range.
1757 
1758     for (int n = 0; n < in->num_streams; n++) {
1759         struct demux_queue *q1 = current->streams[n];
1760         struct demux_queue *q2 = next->streams[n];
1761 
1762         struct demux_stream *ds = in->streams[n]->ds;
1763         assert(ds->queue == q1);
1764 
1765         // First new packet that is appended to the current range.
1766         struct demux_packet *join_point = q2->head;
1767 
1768         if (q2->head) {
1769             if (q1->head) {
1770                 q1->tail->next = q2->head;
1771             } else {
1772                 q1->head = q2->head;
1773             }
1774             q1->tail = q2->tail;
1775         }
1776 
1777         q1->seek_end = q2->seek_end;
1778         q1->correct_dts &= q2->correct_dts;
1779         q1->correct_pos &= q2->correct_pos;
1780         q1->last_pos = q2->last_pos;
1781         q1->last_dts = q2->last_dts;
1782         q1->last_ts = q2->last_ts;
1783         q1->keyframe_latest = q2->keyframe_latest;
1784         q1->is_eof = q2->is_eof;
1785 
1786         q1->last_pos_fixup = -1;
1787 
1788         q2->head = q2->tail = NULL;
1789         q2->keyframe_first = NULL;
1790         q2->keyframe_latest = NULL;
1791 
1792         if (ds->selected && !ds->reader_head)
1793             ds->reader_head = join_point;
1794         ds->skip_to_keyframe = false;
1795 
1796         // Make the cum_pos values in all q2 packets continuous.
1797         for (struct demux_packet *dp = join_point; dp; dp = dp->next) {
1798             uint64_t next_pos = dp->next ? dp->next->cum_pos : q2->tail_cum_pos;
1799             uint64_t size = next_pos - dp->cum_pos;
1800             dp->cum_pos = q1->tail_cum_pos;
1801             q1->tail_cum_pos += size;
1802         }
1803 
1804         // And update the index with packets from q2.
1805         for (size_t i = 0; i < q2->num_index; i++) {
1806             struct index_entry *e = &QUEUE_INDEX_ENTRY(q2, i);
1807             add_index_entry(q1, e->pkt, e->pts);
1808         }
1809         free_index(q2);
1810 
1811         // For moving demuxer position.
1812         ds->refreshing = ds->selected;
1813     }
1814 
1815     for (int n = 0; n < next->num_metadata; n++) {
1816         MP_TARRAY_APPEND(current, current->metadata, current->num_metadata,
1817                          next->metadata[n]);
1818     }
1819     next->num_metadata = 0;
1820 
1821     update_seek_ranges(current);
1822 
1823     // Move demuxing position to after the current range.
1824     in->seeking = true;
1825     in->seek_flags = SEEK_HR;
1826     in->seek_pts = next->seek_end - 1.0;
1827 
1828     MP_VERBOSE(in, "ranges joined!\n");
1829 
1830     for (int n = 0; n < in->num_streams; n++)
1831         back_demux_see_packets(in->streams[n]->ds);
1832 
1833 failed:
1834     clear_cached_range(in, next);
1835     free_empty_cached_ranges(in);
1836 }
1837 
1838 // Compute the assumed first and last frame timestamp for keyframe range
1839 // starting at pkt. To get valid results, pkt->keyframe must be true, otherwise
1840 // nonsense will be returned.
1841 // Always sets *out_kf_min and *out_kf_max without reading them. Both are set
1842 // to NOPTS if there are no timestamps at all in the stream. *kf_max will not
1843 // be set to the actual end time of the decoded output, just the last frame
1844 // (audio will typically end up with kf_min==kf_max).
1845 // Either of out_kf_min and out_kf_max can be NULL, which discards the result.
1846 // Return the next keyframe packet after pkt, or NULL if there's none.
compute_keyframe_times(struct demux_packet * pkt,double * out_kf_min,double * out_kf_max)1847 static struct demux_packet *compute_keyframe_times(struct demux_packet *pkt,
1848                                                    double *out_kf_min,
1849                                                    double *out_kf_max)
1850 {
1851     struct demux_packet *start = pkt;
1852     double min = MP_NOPTS_VALUE;
1853     double max = MP_NOPTS_VALUE;
1854 
1855     while (pkt) {
1856         if (pkt->keyframe && pkt != start)
1857             break;
1858 
1859         double ts = MP_PTS_OR_DEF(pkt->pts, pkt->dts);
1860         if (pkt->segmented && ((pkt->start != MP_NOPTS_VALUE && ts < pkt->start) ||
1861                                (pkt->end != MP_NOPTS_VALUE && ts > pkt->end)))
1862             ts = MP_NOPTS_VALUE;
1863 
1864         min = MP_PTS_MIN(min, ts);
1865         max = MP_PTS_MAX(max, ts);
1866 
1867         pkt = pkt->next;
1868     }
1869 
1870     if (out_kf_min)
1871         *out_kf_min = min;
1872     if (out_kf_max)
1873         *out_kf_max = max;
1874     return pkt;
1875 }
1876 
1877 // Determine seekable range when a packet is added. If dp==NULL, treat it as
1878 // EOF (i.e. closes the current block).
1879 // This has to deal with a number of corner cases, such as demuxers potentially
1880 // starting output at non-keyframes.
1881 // Can join seek ranges, which messes with in->current_range and all.
adjust_seek_range_on_packet(struct demux_stream * ds,struct demux_packet * dp)1882 static void adjust_seek_range_on_packet(struct demux_stream *ds,
1883                                         struct demux_packet *dp)
1884 {
1885     struct demux_queue *queue = ds->queue;
1886 
1887     if (!ds->in->seekable_cache)
1888         return;
1889 
1890     bool new_eof = !dp;
1891     bool update_ranges = queue->is_eof != new_eof;
1892     queue->is_eof = new_eof;
1893 
1894     if (!dp || dp->keyframe) {
1895         if (queue->keyframe_latest) {
1896             double kf_min, kf_max;
1897             compute_keyframe_times(queue->keyframe_latest, &kf_min, &kf_max);
1898 
1899             if (kf_min != MP_NOPTS_VALUE) {
1900                 add_index_entry(queue, queue->keyframe_latest, kf_min);
1901 
1902                 // Initialize the queue's start if it's unset.
1903                 if (queue->seek_start == MP_NOPTS_VALUE) {
1904                     update_ranges = true;
1905                     queue->seek_start = kf_min + ds->sh->seek_preroll;
1906                 }
1907             }
1908 
1909             if (kf_max != MP_NOPTS_VALUE &&
1910                 (queue->seek_end == MP_NOPTS_VALUE || kf_max > queue->seek_end))
1911             {
1912                 // If the queue was past the current range's end even before
1913                 // this update, it means _other_ streams are not there yet,
1914                 // and the seek range doesn't need to be updated. This means
1915                 // if the _old_ queue->seek_end was already after the range end,
1916                 // then the new seek_end won't extend the range either.
1917                 if (queue->range->seek_end == MP_NOPTS_VALUE ||
1918                     queue->seek_end <= queue->range->seek_end)
1919                 {
1920                     update_ranges = true;
1921                 }
1922 
1923                 queue->seek_end = kf_max;
1924             }
1925         }
1926 
1927         queue->keyframe_latest = dp;
1928     }
1929 
1930     // Adding a sparse packet never changes the seek range.
1931     if (update_ranges && ds->eager) {
1932         update_seek_ranges(queue->range);
1933         attempt_range_joining(ds->in);
1934     }
1935 }
1936 
recorder_create(struct demux_internal * in,const char * dst)1937 static struct mp_recorder *recorder_create(struct demux_internal *in,
1938                                            const char *dst)
1939 {
1940     struct sh_stream **streams = NULL;
1941     int num_streams = 0;
1942     for (int n = 0; n < in->num_streams; n++) {
1943         struct sh_stream *stream = in->streams[n];
1944         if (stream->ds->selected)
1945             MP_TARRAY_APPEND(NULL, streams, num_streams, stream);
1946     }
1947 
1948     struct demuxer *demuxer = in->d_thread;
1949     struct demux_attachment **attachments = talloc_array(NULL, struct demux_attachment*, demuxer->num_attachments);
1950     for (int n = 0; n < demuxer->num_attachments; n++) {
1951         attachments[n] = &demuxer->attachments[n];
1952     }
1953 
1954     struct mp_recorder *res = mp_recorder_create(in->d_thread->global, dst,
1955                                                  streams, num_streams,
1956                                                  attachments, demuxer->num_attachments);
1957     talloc_free(streams);
1958     talloc_free(attachments);
1959     return res;
1960 }
1961 
write_dump_packet(struct demux_internal * in,struct demux_packet * dp)1962 static void write_dump_packet(struct demux_internal *in, struct demux_packet *dp)
1963 {
1964     assert(in->dumper);
1965     assert(in->dumper_status == CONTROL_TRUE);
1966 
1967     struct mp_recorder_sink *sink =
1968         mp_recorder_get_sink(in->dumper, in->streams[dp->stream]);
1969     if (sink) {
1970         mp_recorder_feed_packet(sink, dp);
1971     } else {
1972         MP_ERR(in, "New stream appeared; stopping recording.\n");
1973         in->dumper_status = CONTROL_ERROR;
1974     }
1975 }
1976 
record_packet(struct demux_internal * in,struct demux_packet * dp)1977 static void record_packet(struct demux_internal *in, struct demux_packet *dp)
1978 {
1979     // (should preferably be outside of the lock)
1980     if (in->enable_recording && !in->recorder &&
1981         in->opts->record_file && in->opts->record_file[0])
1982     {
1983         // Later failures shouldn't make it retry and overwrite the previously
1984         // recorded file.
1985         in->enable_recording = false;
1986 
1987         in->recorder = recorder_create(in, in->opts->record_file);
1988         if (!in->recorder)
1989             MP_ERR(in, "Disabling recording.\n");
1990     }
1991 
1992     if (in->recorder) {
1993         struct mp_recorder_sink *sink =
1994             mp_recorder_get_sink(in->recorder, in->streams[dp->stream]);
1995         if (sink) {
1996             mp_recorder_feed_packet(sink, dp);
1997         } else {
1998             MP_ERR(in, "New stream appeared; stopping recording.\n");
1999             mp_recorder_destroy(in->recorder);
2000             in->recorder = NULL;
2001         }
2002     }
2003 
2004     if (in->dumper_status == CONTROL_OK)
2005         write_dump_packet(in, dp);
2006 }
2007 
add_packet_locked(struct sh_stream * stream,demux_packet_t * dp)2008 static void add_packet_locked(struct sh_stream *stream, demux_packet_t *dp)
2009 {
2010     struct demux_stream *ds = stream ? stream->ds : NULL;
2011     if (!dp->len || demux_cancel_test(ds->in->d_thread)) {
2012         talloc_free(dp);
2013         return;
2014     }
2015 
2016     assert(dp->stream == stream->index);
2017     assert(!dp->next);
2018 
2019     struct demux_internal *in = ds->in;
2020 
2021     in->after_seek = false;
2022     in->after_seek_to_start = false;
2023 
2024     double ts = dp->dts == MP_NOPTS_VALUE ? dp->pts : dp->dts;
2025     if (dp->segmented)
2026         ts = MP_PTS_MIN(ts, dp->end);
2027 
2028     if (ts != MP_NOPTS_VALUE)
2029         in->demux_ts = ts;
2030 
2031     struct demux_queue *queue = ds->queue;
2032 
2033     bool drop = !ds->selected || in->seeking || ds->sh->attached_picture;
2034 
2035     if (!drop) {
2036         // If libavformat splits packets, some packets will have pos unset, so
2037         // make up one based on the first packet => makes refresh seeks work.
2038         if ((dp->pos < 0 || dp->pos == queue->last_pos_fixup) &&
2039             !dp->keyframe && queue->last_pos_fixup >= 0)
2040             dp->pos = queue->last_pos_fixup + 1;
2041         queue->last_pos_fixup = dp->pos;
2042     }
2043 
2044     if (!drop && ds->refreshing) {
2045         // Resume reading once the old position was reached (i.e. we start
2046         // returning packets where we left off before the refresh).
2047         // If it's the same position, drop, but continue normally next time.
2048         if (queue->correct_dts) {
2049             ds->refreshing = dp->dts < queue->last_dts;
2050         } else if (queue->correct_pos) {
2051             ds->refreshing = dp->pos < queue->last_pos;
2052         } else {
2053             ds->refreshing = false; // should not happen
2054             MP_WARN(in, "stream %d: demux refreshing failed\n", ds->index);
2055         }
2056         drop = true;
2057     }
2058 
2059     if (drop) {
2060         talloc_free(dp);
2061         return;
2062     }
2063 
2064     record_packet(in, dp);
2065 
2066     if (in->cache && in->opts->disk_cache) {
2067         int64_t pos = demux_cache_write(in->cache, dp);
2068         if (pos >= 0) {
2069             demux_packet_unref_contents(dp);
2070             dp->is_cached = true;
2071             dp->cached_data.pos = pos;
2072         }
2073     }
2074 
2075     queue->correct_pos &= dp->pos >= 0 && dp->pos > queue->last_pos;
2076     queue->correct_dts &= dp->dts != MP_NOPTS_VALUE && dp->dts > queue->last_dts;
2077     queue->last_pos = dp->pos;
2078     queue->last_dts = dp->dts;
2079     ds->global_correct_pos &= queue->correct_pos;
2080     ds->global_correct_dts &= queue->correct_dts;
2081 
2082     // (keep in mind that even if the reader went out of data, the queue is not
2083     // necessarily empty due to the backbuffer)
2084     if (!ds->reader_head && (!ds->skip_to_keyframe || dp->keyframe)) {
2085         ds->reader_head = dp;
2086         ds->skip_to_keyframe = false;
2087     }
2088 
2089     size_t bytes = demux_packet_estimate_total_size(dp);
2090     in->total_bytes += bytes;
2091     dp->cum_pos = queue->tail_cum_pos;
2092     queue->tail_cum_pos += bytes;
2093 
2094     if (queue->tail) {
2095         // next packet in stream
2096         queue->tail->next = dp;
2097         queue->tail = dp;
2098     } else {
2099         // first packet in stream
2100         queue->head = queue->tail = dp;
2101     }
2102 
2103     if (!ds->ignore_eof) {
2104         // obviously not true anymore
2105         ds->eof = false;
2106         in->eof = false;
2107     }
2108 
2109     // For video, PTS determination is not trivial, but for other media types
2110     // distinguishing PTS and DTS is not useful.
2111     if (stream->type != STREAM_VIDEO && dp->pts == MP_NOPTS_VALUE)
2112         dp->pts = dp->dts;
2113 
2114     if (ts != MP_NOPTS_VALUE && (ts > queue->last_ts || ts + 10 < queue->last_ts))
2115         queue->last_ts = ts;
2116     if (ds->base_ts == MP_NOPTS_VALUE)
2117         ds->base_ts = queue->last_ts;
2118 
2119     const char *num_pkts = queue->head == queue->tail ? "1" : ">1";
2120     uint64_t fw_bytes = get_foward_buffered_bytes(ds);
2121     MP_TRACE(in, "append packet to %s: size=%zu pts=%f dts=%f pos=%"PRIi64" "
2122              "[num=%s size=%zd]\n", stream_type_name(stream->type),
2123              dp->len, dp->pts, dp->dts, dp->pos, num_pkts, (size_t)fw_bytes);
2124 
2125     adjust_seek_range_on_packet(ds, dp);
2126 
2127     // May need to reduce backward cache.
2128     prune_old_packets(in);
2129 
2130     // Possibly update duration based on highest TS demuxed (but ignore subs).
2131     if (stream->type != STREAM_SUB) {
2132         if (dp->segmented)
2133             ts = MP_PTS_MIN(ts, dp->end);
2134         if (ts > in->highest_av_pts) {
2135             in->highest_av_pts = ts;
2136             double duration = in->highest_av_pts - in->d_thread->start_time;
2137             if (duration > in->d_thread->duration) {
2138                 in->d_thread->duration = duration;
2139                 // (Don't wakeup user thread, would be too noisy.)
2140                 in->events |= DEMUX_EVENT_DURATION;
2141                 in->duration = duration;
2142             }
2143         }
2144     }
2145 
2146     // Don't process the packet further if it's skipped by the previous seek
2147     // (see reader_head check/assignment above).
2148     if (!ds->reader_head)
2149         return;
2150 
2151     back_demux_see_packets(ds);
2152 
2153     wakeup_ds(ds);
2154 }
2155 
mark_stream_eof(struct demux_stream * ds)2156 static void mark_stream_eof(struct demux_stream *ds)
2157 {
2158     if (!ds->eof) {
2159         ds->eof = true;
2160         adjust_seek_range_on_packet(ds, NULL);
2161         back_demux_see_packets(ds);
2162         wakeup_ds(ds);
2163     }
2164 }
2165 
lazy_stream_needs_wait(struct demux_stream * ds)2166 static bool lazy_stream_needs_wait(struct demux_stream *ds)
2167 {
2168     struct demux_internal *in = ds->in;
2169     // Attempt to read until force_read_until was reached, or reading has
2170     // stopped for some reason (true EOF, queue overflow).
2171     return !ds->eager && !ds->reader_head && !in->back_demuxing &&
2172            !in->eof && ds->force_read_until != MP_NOPTS_VALUE &&
2173            (in->demux_ts == MP_NOPTS_VALUE ||
2174             in->demux_ts <= ds->force_read_until);
2175 }
2176 
2177 // Returns true if there was "progress" (lock was released temporarily).
read_packet(struct demux_internal * in)2178 static bool read_packet(struct demux_internal *in)
2179 {
2180     bool was_reading = in->reading;
2181     in->reading = false;
2182 
2183     if (!was_reading || in->blocked || demux_cancel_test(in->d_thread))
2184         return false;
2185 
2186     // Check if we need to read a new packet. We do this if all queues are below
2187     // the minimum, or if a stream explicitly needs new packets. Also includes
2188     // safe-guards against packet queue overflow.
2189     bool read_more = false, prefetch_more = false, refresh_more = false;
2190     uint64_t total_fw_bytes = 0;
2191     for (int n = 0; n < in->num_streams; n++) {
2192         struct demux_stream *ds = in->streams[n]->ds;
2193         if (ds->eager) {
2194             read_more |= !ds->reader_head;
2195             if (in->back_demuxing)
2196                 read_more |= ds->back_restarting || ds->back_resuming;
2197         } else {
2198             if (lazy_stream_needs_wait(ds)) {
2199                 read_more = true;
2200             } else {
2201                 mark_stream_eof(ds); // let playback continue
2202             }
2203         }
2204         refresh_more |= ds->refreshing;
2205         if (ds->eager && ds->queue->last_ts != MP_NOPTS_VALUE &&
2206             in->min_secs > 0 && ds->base_ts != MP_NOPTS_VALUE &&
2207             ds->queue->last_ts >= ds->base_ts &&
2208             !in->back_demuxing)
2209             prefetch_more |= ds->queue->last_ts - ds->base_ts < in->min_secs;
2210         total_fw_bytes += get_foward_buffered_bytes(ds);
2211     }
2212 
2213     MP_TRACE(in, "bytes=%zd, read_more=%d prefetch_more=%d, refresh_more=%d\n",
2214              (size_t)total_fw_bytes, read_more, prefetch_more, refresh_more);
2215     if (total_fw_bytes >= in->max_bytes) {
2216         // if we hit the limit just by prefetching, simply stop prefetching
2217         if (!read_more)
2218             return false;
2219         if (!in->warned_queue_overflow) {
2220             in->warned_queue_overflow = true;
2221             MP_WARN(in, "Too many packets in the demuxer packet queues:\n");
2222             for (int n = 0; n < in->num_streams; n++) {
2223                 struct demux_stream *ds = in->streams[n]->ds;
2224                 if (ds->selected) {
2225                     size_t num_pkts = 0;
2226                     for (struct demux_packet *dp = ds->reader_head;
2227                          dp; dp = dp->next)
2228                         num_pkts++;
2229                     uint64_t fw_bytes = get_foward_buffered_bytes(ds);
2230                     MP_WARN(in, "  %s/%d: %zd packets, %zd bytes%s%s\n",
2231                             stream_type_name(ds->type), n,
2232                             num_pkts, (size_t)fw_bytes,
2233                             ds->eager ? "" : " (lazy)",
2234                             ds->refreshing ? " (refreshing)" : "");
2235                 }
2236             }
2237             if (in->back_demuxing)
2238                 MP_ERR(in, "Backward playback is likely stuck/broken now.\n");
2239         }
2240         for (int n = 0; n < in->num_streams; n++) {
2241             struct demux_stream *ds = in->streams[n]->ds;
2242             if (!ds->reader_head)
2243                 mark_stream_eof(ds);
2244         }
2245         return false;
2246     }
2247 
2248     if (!read_more && !prefetch_more && !refresh_more)
2249         return false;
2250 
2251     if (in->after_seek_to_start) {
2252         for (int n = 0; n < in->num_streams; n++) {
2253             struct demux_stream *ds = in->streams[n]->ds;
2254             in->current_range->streams[n]->is_bof =
2255                 ds->selected && !ds->refreshing;
2256         }
2257     }
2258 
2259     // Actually read a packet. Drop the lock while doing so, because waiting
2260     // for disk or network I/O can take time.
2261     in->reading = true;
2262     in->after_seek = false;
2263     in->after_seek_to_start = false;
2264     pthread_mutex_unlock(&in->lock);
2265 
2266     struct demuxer *demux = in->d_thread;
2267     struct demux_packet *pkt = NULL;
2268 
2269     bool eof = true;
2270     if (demux->desc->read_packet && !demux_cancel_test(demux))
2271         eof = !demux->desc->read_packet(demux, &pkt);
2272 
2273     pthread_mutex_lock(&in->lock);
2274     update_cache(in);
2275 
2276     if (pkt) {
2277         assert(pkt->stream >= 0 && pkt->stream < in->num_streams);
2278         add_packet_locked(in->streams[pkt->stream], pkt);
2279     }
2280 
2281     if (!in->seeking) {
2282         if (eof) {
2283             for (int n = 0; n < in->num_streams; n++)
2284                 mark_stream_eof(in->streams[n]->ds);
2285             // If we had EOF previously, then don't wakeup (avoids wakeup loop)
2286             if (!in->eof) {
2287                 if (in->wakeup_cb)
2288                     in->wakeup_cb(in->wakeup_cb_ctx);
2289                 pthread_cond_signal(&in->wakeup);
2290                 MP_VERBOSE(in, "EOF reached.\n");
2291             }
2292         }
2293         in->eof = eof;
2294         in->reading = !eof;
2295     }
2296     return true;
2297 }
2298 
prune_old_packets(struct demux_internal * in)2299 static void prune_old_packets(struct demux_internal *in)
2300 {
2301     assert(in->current_range == in->ranges[in->num_ranges - 1]);
2302 
2303     // It's not clear what the ideal way to prune old packets is. For now, we
2304     // prune the oldest packet runs, as long as the total cache amount is too
2305     // big.
2306     while (1) {
2307         uint64_t fw_bytes = 0;
2308         for (int n = 0; n < in->num_streams; n++) {
2309             struct demux_stream *ds = in->streams[n]->ds;
2310             fw_bytes += get_foward_buffered_bytes(ds);
2311         }
2312         uint64_t max_avail = in->max_bytes_bw;
2313         // Backward cache (if enabled at all) can use unused forward cache.
2314         // Still leave 1 byte free, so the read_packet logic doesn't get stuck.
2315         if (max_avail && in->max_bytes > (fw_bytes + 1) && in->opts->donate_fw)
2316             max_avail += in->max_bytes - (fw_bytes + 1);
2317         if (in->total_bytes - fw_bytes <= max_avail)
2318             break;
2319 
2320         // (Start from least recently used range.)
2321         struct demux_cached_range *range = in->ranges[0];
2322         double earliest_ts = MP_NOPTS_VALUE;
2323         struct demux_stream *earliest_stream = NULL;
2324 
2325         for (int n = 0; n < range->num_streams; n++) {
2326             struct demux_queue *queue = range->streams[n];
2327             struct demux_stream *ds = queue->ds;
2328 
2329             if (queue->head && queue->head != ds->reader_head) {
2330                 struct demux_packet *dp = queue->head;
2331                 double ts = queue->seek_start;
2332                 // If the ts is NOPTS, the queue has no retainable packets, so
2333                 // delete them all. This code is not run when there's enough
2334                 // free space, so normally the queue gets the chance to build up.
2335                 bool prune_always =
2336                     !in->seekable_cache || ts == MP_NOPTS_VALUE || !dp->keyframe;
2337                 if (prune_always || !earliest_stream || ts < earliest_ts) {
2338                     earliest_ts = ts;
2339                     earliest_stream = ds;
2340                     if (prune_always)
2341                         break;
2342                 }
2343             }
2344         }
2345 
2346         // In some cases (like when the seek index became huge), there aren't
2347         // any backwards packets, even if the total cache size is exceeded.
2348         if (!earliest_stream)
2349             break;
2350 
2351         struct demux_stream *ds = earliest_stream;
2352         struct demux_queue *queue = range->streams[ds->index];
2353 
2354         bool non_kf_prune = queue->head && !queue->head->keyframe;
2355         bool kf_was_pruned = false;
2356 
2357         while (queue->head && queue->head != ds->reader_head) {
2358             if (queue->head->keyframe) {
2359                 // If the cache is seekable, only delete until up the next
2360                 // keyframe. This is not always efficient, but ensures we
2361                 // prune all streams fairly.
2362                 // Also, if the first packet was _not_ a keyframe, we want it
2363                 // to remove all preceding non-keyframe packets first, before
2364                 // re-evaluating what to prune next.
2365                 if ((kf_was_pruned || non_kf_prune) && in->seekable_cache)
2366                     break;
2367                 kf_was_pruned = true;
2368             }
2369 
2370             remove_head_packet(queue);
2371         }
2372 
2373         // Need to update the seekable time range.
2374         if (kf_was_pruned) {
2375             assert(!queue->keyframe_first); // it was just deleted, supposedly
2376 
2377             queue->keyframe_first = queue->head;
2378             // (May happen if reader_head stopped pruning the range, and there's
2379             // no next range.)
2380             while (queue->keyframe_first && !queue->keyframe_first->keyframe)
2381                 queue->keyframe_first = queue->keyframe_first->next;
2382 
2383             if (queue->seek_start != MP_NOPTS_VALUE)
2384                 queue->last_pruned = queue->seek_start;
2385 
2386             double kf_min;
2387             compute_keyframe_times(queue->keyframe_first, &kf_min, NULL);
2388 
2389             bool update_range = true;
2390 
2391             queue->seek_start = kf_min;
2392 
2393             if (queue->seek_start != MP_NOPTS_VALUE) {
2394                 queue->seek_start += ds->sh->seek_preroll;
2395 
2396                 // Don't need to update if the new start is still before the
2397                 // range's start (or if the range was undefined anyway).
2398                 if (range->seek_start == MP_NOPTS_VALUE ||
2399                     queue->seek_start <= range->seek_start)
2400                 {
2401                     update_range = false;
2402                 }
2403             }
2404 
2405             if (update_range)
2406                 update_seek_ranges(range);
2407         }
2408 
2409         if (range != in->current_range && range->seek_start == MP_NOPTS_VALUE)
2410             free_empty_cached_ranges(in);
2411     }
2412 }
2413 
execute_trackswitch(struct demux_internal * in)2414 static void execute_trackswitch(struct demux_internal *in)
2415 {
2416     in->tracks_switched = false;
2417 
2418     bool any_selected = false;
2419     for (int n = 0; n < in->num_streams; n++)
2420         any_selected |= in->streams[n]->ds->selected;
2421 
2422     pthread_mutex_unlock(&in->lock);
2423 
2424     if (in->d_thread->desc->switched_tracks)
2425         in->d_thread->desc->switched_tracks(in->d_thread);
2426 
2427     pthread_mutex_lock(&in->lock);
2428 }
2429 
execute_seek(struct demux_internal * in)2430 static void execute_seek(struct demux_internal *in)
2431 {
2432     int flags = in->seek_flags;
2433     double pts = in->seek_pts;
2434     in->eof = false;
2435     in->seeking = false;
2436     in->seeking_in_progress = pts;
2437     in->demux_ts = MP_NOPTS_VALUE;
2438     in->low_level_seeks += 1;
2439     in->after_seek = true;
2440     in->after_seek_to_start =
2441         !(flags & (SEEK_FORWARD | SEEK_FACTOR)) &&
2442         pts <= in->d_thread->start_time;
2443 
2444     for (int n = 0; n < in->num_streams; n++)
2445         in->streams[n]->ds->queue->last_pos_fixup = -1;
2446 
2447     if (in->recorder)
2448         mp_recorder_mark_discontinuity(in->recorder);
2449 
2450     pthread_mutex_unlock(&in->lock);
2451 
2452     MP_VERBOSE(in, "execute seek (to %f flags %d)\n", pts, flags);
2453 
2454     if (in->d_thread->desc->seek)
2455         in->d_thread->desc->seek(in->d_thread, pts, flags);
2456 
2457     MP_VERBOSE(in, "seek done\n");
2458 
2459     pthread_mutex_lock(&in->lock);
2460 
2461     in->seeking_in_progress = MP_NOPTS_VALUE;
2462 }
2463 
update_opts(struct demux_internal * in)2464 static void update_opts(struct demux_internal *in)
2465 {
2466     struct demux_opts *opts = in->opts;
2467 
2468     in->min_secs = opts->min_secs;
2469     in->max_bytes = opts->max_bytes;
2470     in->max_bytes_bw = opts->max_bytes_bw;
2471 
2472     int seekable = opts->seekable_cache;
2473     bool is_streaming = in->d_thread->is_streaming;
2474     bool use_cache = is_streaming;
2475     if (opts->enable_cache >= 0)
2476         use_cache = opts->enable_cache == 1;
2477 
2478     if (use_cache) {
2479         in->min_secs = MPMAX(in->min_secs, opts->min_secs_cache);
2480         if (seekable < 0)
2481             seekable = 1;
2482     }
2483     in->seekable_cache = seekable == 1;
2484     in->using_network_cache_opts = is_streaming && use_cache;
2485 
2486     if (!in->seekable_cache)
2487         in->max_bytes_bw = 0;
2488 
2489     if (!in->can_cache) {
2490         in->seekable_cache = false;
2491         in->min_secs = 0;
2492         in->max_bytes = 1;
2493         in->max_bytes_bw = 0;
2494         in->using_network_cache_opts = false;
2495     }
2496 
2497     if (in->seekable_cache && opts->disk_cache && !in->cache) {
2498         in->cache = demux_cache_create(in->global, in->log);
2499         if (!in->cache)
2500             MP_ERR(in, "Failed to create file cache.\n");
2501     }
2502 
2503     // The filename option really decides whether recording should be active.
2504     // So if the filename changes, act upon it.
2505     char *old = in->record_filename ? in->record_filename : "";
2506     char *new = opts->record_file ? opts->record_file : "";
2507     if (strcmp(old, new) != 0) {
2508         if (in->recorder) {
2509             MP_WARN(in, "Stopping recording.\n");
2510             mp_recorder_destroy(in->recorder);
2511             in->recorder = NULL;
2512         }
2513         talloc_free(in->record_filename);
2514         in->record_filename = talloc_strdup(in, opts->record_file);
2515         // Note: actual recording only starts once packets are read. It may be
2516         // important to delay creating in->recorder to that point, because the
2517         // demuxer might detect more streams until finding the first packet.
2518         in->enable_recording = in->can_record;
2519     }
2520 
2521     // In case the cache was reduced in size.
2522     prune_old_packets(in);
2523 
2524     // In case the seekable cache was disabled.
2525     free_empty_cached_ranges(in);
2526 }
2527 
2528 // Make demuxing progress. Return whether progress was made.
thread_work(struct demux_internal * in)2529 static bool thread_work(struct demux_internal *in)
2530 {
2531     if (m_config_cache_update(in->opts_cache))
2532         update_opts(in);
2533     if (in->tracks_switched) {
2534         execute_trackswitch(in);
2535         return true;
2536     }
2537     if (in->need_back_seek) {
2538         perform_backward_seek(in);
2539         return true;
2540     }
2541     if (in->back_any_need_recheck) {
2542         check_backward_seek(in);
2543         return true;
2544     }
2545     if (in->seeking) {
2546         execute_seek(in);
2547         return true;
2548     }
2549     if (read_packet(in))
2550         return true; // read_packet unlocked, so recheck conditions
2551     if (mp_time_us() >= in->next_cache_update) {
2552         update_cache(in);
2553         return true;
2554     }
2555     return false;
2556 }
2557 
demux_thread(void * pctx)2558 static void *demux_thread(void *pctx)
2559 {
2560     struct demux_internal *in = pctx;
2561     mpthread_set_name("demux");
2562     pthread_mutex_lock(&in->lock);
2563 
2564     stats_register_thread_cputime(in->stats, "thread");
2565 
2566     while (!in->thread_terminate) {
2567         if (thread_work(in))
2568             continue;
2569         pthread_cond_signal(&in->wakeup);
2570         struct timespec until = mp_time_us_to_timespec(in->next_cache_update);
2571         pthread_cond_timedwait(&in->wakeup, &in->lock, &until);
2572     }
2573 
2574     if (in->shutdown_async) {
2575         pthread_mutex_unlock(&in->lock);
2576         demux_shutdown(in);
2577         pthread_mutex_lock(&in->lock);
2578         in->shutdown_async = false;
2579         if (in->wakeup_cb)
2580             in->wakeup_cb(in->wakeup_cb_ctx);
2581     }
2582 
2583     stats_unregister_thread(in->stats, "thread");
2584 
2585     pthread_mutex_unlock(&in->lock);
2586     return NULL;
2587 }
2588 
2589 // Low-level part of dequeueing a packet.
advance_reader_head(struct demux_stream * ds)2590 static struct demux_packet *advance_reader_head(struct demux_stream *ds)
2591 {
2592     struct demux_packet *pkt = ds->reader_head;
2593     if (!pkt)
2594         return NULL;
2595 
2596     ds->reader_head = pkt->next;
2597 
2598     ds->last_ret_pos = pkt->pos;
2599     ds->last_ret_dts = pkt->dts;
2600 
2601     return pkt;
2602 }
2603 
2604 // Return a newly allocated new packet. The pkt parameter may be either a
2605 // in-memory packet (then a new reference is made), or a reference to
2606 // packet in the disk cache (then the packet is read from disk).
read_packet_from_cache(struct demux_internal * in,struct demux_packet * pkt)2607 static struct demux_packet *read_packet_from_cache(struct demux_internal *in,
2608                                                    struct demux_packet *pkt)
2609 {
2610     if (!pkt)
2611         return NULL;
2612 
2613     if (pkt->is_cached) {
2614         assert(in->cache);
2615         struct demux_packet *meta = pkt;
2616         pkt = demux_cache_read(in->cache, pkt->cached_data.pos);
2617         if (pkt) {
2618             demux_packet_copy_attribs(pkt, meta);
2619         } else {
2620             MP_ERR(in, "Failed to retrieve packet from cache.\n");
2621         }
2622     } else {
2623         // The returned packet is mutated etc. and will be owned by the user.
2624         pkt = demux_copy_packet(pkt);
2625     }
2626 
2627     return pkt;
2628 }
2629 
2630 // Returns:
2631 //   < 0: EOF was reached, *res is not set
2632 //  == 0: no new packet yet, wait, *res is not set
2633 //   > 0: new packet is moved to *res
dequeue_packet(struct demux_stream * ds,double min_pts,struct demux_packet ** res)2634 static int dequeue_packet(struct demux_stream *ds, double min_pts,
2635                           struct demux_packet **res)
2636 {
2637     struct demux_internal *in = ds->in;
2638 
2639     if (!ds->selected)
2640         return -1;
2641     if (in->blocked)
2642         return 0;
2643 
2644     if (ds->sh->attached_picture) {
2645         ds->eof = true;
2646         if (ds->attached_picture_added)
2647             return -1;
2648         ds->attached_picture_added = true;
2649         struct demux_packet *pkt = demux_copy_packet(ds->sh->attached_picture);
2650         if (!pkt)
2651             abort();
2652         pkt->stream = ds->sh->index;
2653         *res = pkt;
2654         return 1;
2655     }
2656 
2657     if (!in->reading && (!in->eof || in->opts->force_retry_eof)) {
2658         in->reading = true; // enable demuxer thread prefetching
2659         pthread_cond_signal(&in->wakeup);
2660     }
2661 
2662     ds->force_read_until = min_pts;
2663 
2664     if (ds->back_resuming || ds->back_restarting) {
2665         assert(in->back_demuxing);
2666         return 0;
2667     }
2668 
2669     bool eof = !ds->reader_head && ds->eof;
2670 
2671     if (in->back_demuxing) {
2672         // Subtitles not supported => EOF.
2673         if (!ds->eager)
2674             return -1;
2675 
2676         // Next keyframe (or EOF) was reached => step back.
2677         if (ds->back_range_started && !ds->back_range_count &&
2678             ((ds->reader_head && ds->reader_head->keyframe) || eof))
2679         {
2680             ds->back_restarting = true;
2681             ds->back_restart_eof = false;
2682             ds->back_restart_next = false;
2683 
2684             find_backward_restart_pos(ds);
2685 
2686             if (ds->back_restarting)
2687                 return 0;
2688         }
2689 
2690         eof = ds->back_range_count < 0;
2691     }
2692 
2693     ds->need_wakeup = !ds->reader_head;
2694     if (!ds->reader_head || eof) {
2695         if (!ds->eager) {
2696             // Non-eager streams temporarily return EOF. If they returned 0,
2697             // the reader would have to wait for new packets, which does not
2698             // make sense due to the sparseness and passiveness of non-eager
2699             // streams.
2700             // Unless the min_pts feature is used: then EOF is only signaled
2701             // if read-ahead went above min_pts.
2702             if (!lazy_stream_needs_wait(ds))
2703                 ds->eof = eof = true;
2704         }
2705         return eof ? -1 : 0;
2706     }
2707 
2708     struct demux_packet *pkt = advance_reader_head(ds);
2709     assert(pkt);
2710     pkt = read_packet_from_cache(in, pkt);
2711     if (!pkt)
2712         return 0;
2713 
2714     if (in->back_demuxing) {
2715         if (pkt->keyframe) {
2716             assert(ds->back_range_count > 0);
2717             ds->back_range_count -= 1;
2718             if (ds->back_range_preroll >= 0)
2719                 ds->back_range_preroll -= 1;
2720         }
2721 
2722         if (ds->back_range_preroll >= 0)
2723             pkt->back_preroll = true;
2724 
2725         if (!ds->back_range_started) {
2726             pkt->back_restart = true;
2727             ds->back_range_started = true;
2728         }
2729     }
2730 
2731     double ts = MP_PTS_OR_DEF(pkt->dts, pkt->pts);
2732     if (ts != MP_NOPTS_VALUE)
2733         ds->base_ts = ts;
2734 
2735     if (pkt->keyframe && ts != MP_NOPTS_VALUE) {
2736         // Update bitrate - only at keyframe points, because we use the
2737         // (possibly) reordered packet timestamps instead of realtime.
2738         double d = ts - ds->last_br_ts;
2739         if (ds->last_br_ts == MP_NOPTS_VALUE || d < 0) {
2740             ds->bitrate = -1;
2741             ds->last_br_ts = ts;
2742             ds->last_br_bytes = 0;
2743         } else if (d >= 0.5) { // a window of least 500ms for UI purposes
2744             ds->bitrate = ds->last_br_bytes / d;
2745             ds->last_br_ts = ts;
2746             ds->last_br_bytes = 0;
2747         }
2748     }
2749     ds->last_br_bytes += pkt->len;
2750 
2751     // This implies this function is actually called from "the" user thread.
2752     if (pkt->pos >= in->d_user->filepos)
2753         in->d_user->filepos = pkt->pos;
2754     in->d_user->filesize = in->stream_size;
2755 
2756     pkt->pts = MP_ADD_PTS(pkt->pts, in->ts_offset);
2757     pkt->dts = MP_ADD_PTS(pkt->dts, in->ts_offset);
2758 
2759     if (pkt->segmented) {
2760         pkt->start = MP_ADD_PTS(pkt->start, in->ts_offset);
2761         pkt->end = MP_ADD_PTS(pkt->end, in->ts_offset);
2762     }
2763 
2764     prune_old_packets(in);
2765     *res = pkt;
2766     return 1;
2767 }
2768 
2769 // Poll the demuxer queue, and if there's a packet, return it. Otherwise, just
2770 // make the demuxer thread read packets for this stream, and if there's at
2771 // least one packet, call the wakeup callback.
2772 // This enables readahead if it wasn't yet (except for interleaved subtitles).
2773 // Returns:
2774 //   < 0: EOF was reached, *out_pkt=NULL
2775 //  == 0: no new packet yet, but maybe later, *out_pkt=NULL
2776 //   > 0: new packet read, *out_pkt is set
2777 // Note: when reading interleaved subtitles, the demuxer won't try to forcibly
2778 // read ahead to get the next subtitle packet (as the next packet could be
2779 // minutes away). In this situation, this function will just return -1.
demux_read_packet_async(struct sh_stream * sh,struct demux_packet ** out_pkt)2780 int demux_read_packet_async(struct sh_stream *sh, struct demux_packet **out_pkt)
2781 {
2782     return demux_read_packet_async_until(sh, MP_NOPTS_VALUE, out_pkt);
2783 }
2784 
2785 // Like demux_read_packet_async(). They are the same for min_pts==MP_NOPTS_VALUE.
2786 // If min_pts is set, and the stream is lazily read (eager=false, interleaved
2787 // subtitles), then return 0 until demuxing has reached min_pts, or the queue
2788 // overflowed, or EOF was reached, or a packet was read for this stream.
demux_read_packet_async_until(struct sh_stream * sh,double min_pts,struct demux_packet ** out_pkt)2789 int demux_read_packet_async_until(struct sh_stream *sh, double min_pts,
2790                                   struct demux_packet **out_pkt)
2791 {
2792     struct demux_stream *ds = sh ? sh->ds : NULL;
2793     *out_pkt = NULL;
2794     if (!ds)
2795         return -1;
2796     struct demux_internal *in = ds->in;
2797 
2798     pthread_mutex_lock(&in->lock);
2799     int r = -1;
2800     while (1) {
2801         r = dequeue_packet(ds, min_pts, out_pkt);
2802         if (in->threading || in->blocked || r != 0)
2803             break;
2804         // Needs to actually read packets until we got a packet or EOF.
2805         thread_work(in);
2806     }
2807     pthread_mutex_unlock(&in->lock);
2808     return r;
2809 }
2810 
2811 // Read and return any packet we find. NULL means EOF.
2812 // Does not work with threading (don't call demux_start_thread()).
demux_read_any_packet(struct demuxer * demuxer)2813 struct demux_packet *demux_read_any_packet(struct demuxer *demuxer)
2814 {
2815     struct demux_internal *in = demuxer->in;
2816     pthread_mutex_lock(&in->lock);
2817     assert(!in->threading); // doesn't work with threading
2818     struct demux_packet *out_pkt = NULL;
2819     bool read_more = true;
2820     while (read_more && !in->blocked) {
2821         bool all_eof = true;
2822         for (int n = 0; n < in->num_streams; n++) {
2823             int r = dequeue_packet(in->streams[n]->ds, MP_NOPTS_VALUE, &out_pkt);
2824             if (r > 0)
2825                 goto done;
2826             if (r == 0)
2827                 all_eof = false;
2828         }
2829         // retry after calling this
2830         read_more = thread_work(in);
2831         read_more &= !all_eof;
2832     }
2833 done:
2834     pthread_mutex_unlock(&in->lock);
2835     return out_pkt;
2836 }
2837 
demuxer_help(struct mp_log * log,const m_option_t * opt,struct bstr name)2838 int demuxer_help(struct mp_log *log, const m_option_t *opt, struct bstr name)
2839 {
2840     int i;
2841 
2842     mp_info(log, "Available demuxers:\n");
2843     mp_info(log, " demuxer:   info:\n");
2844     for (i = 0; demuxer_list[i]; i++) {
2845         mp_info(log, "%10s  %s\n",
2846                 demuxer_list[i]->name, demuxer_list[i]->desc);
2847     }
2848     mp_info(log, "\n");
2849 
2850     return M_OPT_EXIT;
2851 }
2852 
d_level(enum demux_check level)2853 static const char *d_level(enum demux_check level)
2854 {
2855     switch (level) {
2856     case DEMUX_CHECK_FORCE:  return "force";
2857     case DEMUX_CHECK_UNSAFE: return "unsafe";
2858     case DEMUX_CHECK_REQUEST:return "request";
2859     case DEMUX_CHECK_NORMAL: return "normal";
2860     }
2861     abort();
2862 }
2863 
decode_float(char * str,float * out)2864 static int decode_float(char *str, float *out)
2865 {
2866     char *rest;
2867     float dec_val;
2868 
2869     dec_val = strtod(str, &rest);
2870     if (!rest || (rest == str) || !isfinite(dec_val))
2871         return -1;
2872 
2873     *out = dec_val;
2874     return 0;
2875 }
2876 
decode_gain(struct mp_log * log,struct mp_tags * tags,const char * tag,float * out)2877 static int decode_gain(struct mp_log *log, struct mp_tags *tags,
2878                        const char *tag, float *out)
2879 {
2880     char *tag_val = NULL;
2881     float dec_val;
2882 
2883     tag_val = mp_tags_get_str(tags, tag);
2884     if (!tag_val)
2885         return -1;
2886 
2887     if (decode_float(tag_val, &dec_val) < 0) {
2888         mp_msg(log, MSGL_ERR, "Invalid replaygain value\n");
2889         return -1;
2890     }
2891 
2892     *out = dec_val;
2893     return 0;
2894 }
2895 
decode_peak(struct mp_log * log,struct mp_tags * tags,const char * tag,float * out)2896 static int decode_peak(struct mp_log *log, struct mp_tags *tags,
2897                        const char *tag, float *out)
2898 {
2899     char *tag_val = NULL;
2900     float dec_val;
2901 
2902     *out = 1.0;
2903 
2904     tag_val = mp_tags_get_str(tags, tag);
2905     if (!tag_val)
2906         return 0;
2907 
2908     if (decode_float(tag_val, &dec_val) < 0 || dec_val <= 0.0)
2909         return -1;
2910 
2911     *out = dec_val;
2912     return 0;
2913 }
2914 
decode_rgain(struct mp_log * log,struct mp_tags * tags)2915 static struct replaygain_data *decode_rgain(struct mp_log *log,
2916                                             struct mp_tags *tags)
2917 {
2918     struct replaygain_data rg = {0};
2919 
2920     // Set values in *rg, using track gain as a fallback for album gain if the
2921     // latter is not present. This behavior matches that in demux/demux_lavf.c's
2922     // export_replaygain; if you change this, please make equivalent changes
2923     // there too.
2924     if (decode_gain(log, tags, "REPLAYGAIN_TRACK_GAIN", &rg.track_gain) >= 0 &&
2925         decode_peak(log, tags, "REPLAYGAIN_TRACK_PEAK", &rg.track_peak) >= 0)
2926     {
2927         if (decode_gain(log, tags, "REPLAYGAIN_ALBUM_GAIN", &rg.album_gain) < 0 ||
2928             decode_peak(log, tags, "REPLAYGAIN_ALBUM_PEAK", &rg.album_peak) < 0)
2929         {
2930             // Album gain is undefined; fall back to track gain.
2931             rg.album_gain = rg.track_gain;
2932             rg.album_peak = rg.track_peak;
2933         }
2934         return talloc_dup(NULL, &rg);
2935     }
2936 
2937     if (decode_gain(log, tags, "REPLAYGAIN_GAIN", &rg.track_gain) >= 0 &&
2938         decode_peak(log, tags, "REPLAYGAIN_PEAK", &rg.track_peak) >= 0)
2939     {
2940         rg.album_gain = rg.track_gain;
2941         rg.album_peak = rg.track_peak;
2942         return talloc_dup(NULL, &rg);
2943     }
2944 
2945     return NULL;
2946 }
2947 
demux_update_replaygain(demuxer_t * demuxer)2948 static void demux_update_replaygain(demuxer_t *demuxer)
2949 {
2950     struct demux_internal *in = demuxer->in;
2951     for (int n = 0; n < in->num_streams; n++) {
2952         struct sh_stream *sh = in->streams[n];
2953         if (sh->type == STREAM_AUDIO && !sh->codec->replaygain_data) {
2954             struct replaygain_data *rg = decode_rgain(demuxer->log, sh->tags);
2955             if (!rg)
2956                 rg = decode_rgain(demuxer->log, demuxer->metadata);
2957             if (rg)
2958                 sh->codec->replaygain_data = talloc_steal(in, rg);
2959         }
2960     }
2961 }
2962 
2963 // Copy some fields from src to dst (for initialization).
demux_copy(struct demuxer * dst,struct demuxer * src)2964 static void demux_copy(struct demuxer *dst, struct demuxer *src)
2965 {
2966     // Note that we do as shallow copies as possible. We expect the data
2967     // that is not-copied (only referenced) to be immutable.
2968     // This implies e.g. that no chapters are added after initialization.
2969     dst->chapters = src->chapters;
2970     dst->num_chapters = src->num_chapters;
2971     dst->editions = src->editions;
2972     dst->num_editions = src->num_editions;
2973     dst->edition = src->edition;
2974     dst->attachments = src->attachments;
2975     dst->num_attachments = src->num_attachments;
2976     dst->matroska_data = src->matroska_data;
2977     dst->playlist = src->playlist;
2978     dst->seekable = src->seekable;
2979     dst->partially_seekable = src->partially_seekable;
2980     dst->filetype = src->filetype;
2981     dst->ts_resets_possible = src->ts_resets_possible;
2982     dst->fully_read = src->fully_read;
2983     dst->start_time = src->start_time;
2984     dst->duration = src->duration;
2985     dst->is_network = src->is_network;
2986     dst->is_streaming = src->is_streaming;
2987     dst->stream_origin = src->stream_origin;
2988     dst->priv = src->priv;
2989     dst->metadata = mp_tags_dup(dst, src->metadata);
2990 }
2991 
2992 // Update metadata after initialization. If sh==NULL, it's global metadata,
2993 // otherwise it's bound to the stream. If pts==NOPTS, use the highest known pts
2994 // in the stream. Caller retains ownership of tags ptr. Called locked.
add_timed_metadata(struct demux_internal * in,struct mp_tags * tags,struct sh_stream * sh,double pts)2995 static void add_timed_metadata(struct demux_internal *in, struct mp_tags *tags,
2996                                struct sh_stream *sh, double pts)
2997 {
2998     struct demux_cached_range *r = in->current_range;
2999     if (!r)
3000         return;
3001 
3002     // We don't expect this, nor do we find it useful.
3003     if (sh && sh != in->metadata_stream)
3004         return;
3005 
3006     if (pts == MP_NOPTS_VALUE) {
3007         for (int n = 0; n < r->num_streams; n++)
3008             pts = MP_PTS_MAX(pts, r->streams[n]->last_ts);
3009 
3010         // Tends to happen when doing the initial icy update.
3011         if (pts == MP_NOPTS_VALUE)
3012             pts = in->d_thread->start_time;
3013     }
3014 
3015     struct timed_metadata *tm = talloc_zero(NULL, struct timed_metadata);
3016     *tm = (struct timed_metadata){
3017         .pts = pts,
3018         .tags = mp_tags_dup(tm, tags),
3019         .from_stream = !!sh,
3020     };
3021     MP_TARRAY_APPEND(r, r->metadata, r->num_metadata, tm);
3022 }
3023 
3024 // This is called by demuxer implementations if sh->tags changed. Note that
3025 // sh->tags itself is never actually changed (it's immutable, because sh->tags
3026 // can be accessed by the playback thread, and there is no synchronization).
3027 // pts is the time at/after which the metadata becomes effective. You're
3028 // supposed to call this ordered by time, and only while a packet is being
3029 // read.
3030 // Ownership of tags goes to the function.
demux_stream_tags_changed(struct demuxer * demuxer,struct sh_stream * sh,struct mp_tags * tags,double pts)3031 void demux_stream_tags_changed(struct demuxer *demuxer, struct sh_stream *sh,
3032                                struct mp_tags *tags, double pts)
3033 {
3034     struct demux_internal *in = demuxer->in;
3035     assert(demuxer == in->d_thread);
3036     struct demux_stream *ds = sh ? sh->ds : NULL;
3037     assert(!sh || ds); // stream must have been added
3038 
3039     pthread_mutex_lock(&in->lock);
3040 
3041     if (pts == MP_NOPTS_VALUE) {
3042         MP_WARN(in, "Discarding timed metadata without timestamp.\n");
3043     } else {
3044         add_timed_metadata(in, tags, sh, pts);
3045     }
3046     talloc_free(tags);
3047 
3048     pthread_mutex_unlock(&in->lock);
3049 }
3050 
3051 // This is called by demuxer implementations if demuxer->metadata changed.
3052 // (It will be propagated to the user as timed metadata.)
demux_metadata_changed(demuxer_t * demuxer)3053 void demux_metadata_changed(demuxer_t *demuxer)
3054 {
3055     assert(demuxer == demuxer->in->d_thread); // call from demuxer impl. only
3056     struct demux_internal *in = demuxer->in;
3057 
3058     pthread_mutex_lock(&in->lock);
3059     add_timed_metadata(in, demuxer->metadata, NULL, MP_NOPTS_VALUE);
3060     pthread_mutex_unlock(&in->lock);
3061 }
3062 
3063 // Called locked, with user demuxer.
update_final_metadata(demuxer_t * demuxer,struct timed_metadata * tm)3064 static void update_final_metadata(demuxer_t *demuxer, struct timed_metadata *tm)
3065 {
3066     assert(demuxer == demuxer->in->d_user);
3067     struct demux_internal *in = demuxer->in;
3068 
3069     struct mp_tags *dyn_tags = NULL;
3070 
3071     // Often useful for audio-only files, which have metadata in the audio track
3072     // metadata instead of the main metadata, but can also have cover art
3073     // metadata (which libavformat likes to treat as video streams).
3074     int astreams = 0;
3075     int astream_id = -1;
3076     int vstreams = 0;
3077     for (int n = 0; n < in->num_streams; n++) {
3078         struct sh_stream *sh = in->streams[n];
3079         if (sh->type == STREAM_VIDEO && !sh->attached_picture)
3080             vstreams += 1;
3081         if (sh->type == STREAM_AUDIO) {
3082             astreams += 1;
3083             astream_id = n;
3084         }
3085     }
3086 
3087     // Use the metadata_stream tags only if this really seems to be an audio-
3088     // only stream. Otherwise it will happen too often that "uninteresting"
3089     // stream metadata will trash the actual file tags.
3090     if (vstreams == 0 && astreams == 1 &&
3091         in->streams[astream_id] == in->metadata_stream)
3092     {
3093         dyn_tags = in->metadata_stream->tags;
3094         if (tm && tm->from_stream)
3095             dyn_tags = tm->tags;
3096     }
3097 
3098     // Global metadata updates.
3099     if (tm && !tm->from_stream)
3100         dyn_tags = tm->tags;
3101 
3102     if (dyn_tags)
3103         mp_tags_merge(demuxer->metadata, dyn_tags);
3104 }
3105 
lookup_timed_metadata(struct demux_internal * in,double pts)3106 static struct timed_metadata *lookup_timed_metadata(struct demux_internal *in,
3107                                                     double pts)
3108 {
3109     struct demux_cached_range *r = in->current_range;
3110 
3111     if (!r || !r->num_metadata || pts == MP_NOPTS_VALUE)
3112         return NULL;
3113 
3114     int start = 1;
3115     int i = in->cached_metadata_index;
3116     if (i >= 0 &&  i < r->num_metadata && r->metadata[i]->pts <= pts)
3117         start = i + 1;
3118 
3119     in->cached_metadata_index = r->num_metadata - 1;
3120     for (int n = start; n < r->num_metadata; n++) {
3121         if (r->metadata[n]->pts >= pts) {
3122             in->cached_metadata_index = n - 1;
3123             break;
3124         }
3125     }
3126 
3127     return r->metadata[in->cached_metadata_index];
3128 }
3129 
3130 // Called by the user thread (i.e. player) to update metadata and other things
3131 // from the demuxer thread.
3132 // The pts parameter is the current playback position.
demux_update(demuxer_t * demuxer,double pts)3133 void demux_update(demuxer_t *demuxer, double pts)
3134 {
3135     assert(demuxer == demuxer->in->d_user);
3136     struct demux_internal *in = demuxer->in;
3137 
3138     pthread_mutex_lock(&in->lock);
3139 
3140     if (!in->threading)
3141         update_cache(in);
3142 
3143     // This implies this function is actually called from "the" user thread.
3144     in->d_user->filesize = in->stream_size;
3145 
3146     pts = MP_ADD_PTS(pts, -in->ts_offset);
3147 
3148     struct timed_metadata *prev = lookup_timed_metadata(in, in->last_playback_pts);
3149     struct timed_metadata *cur = lookup_timed_metadata(in, pts);
3150     if (prev != cur || in->force_metadata_update) {
3151         in->force_metadata_update = false;
3152         update_final_metadata(demuxer, cur);
3153         demuxer->events |= DEMUX_EVENT_METADATA;
3154     }
3155 
3156     in->last_playback_pts = pts;
3157 
3158     demuxer->events |= in->events;
3159     in->events = 0;
3160     if (demuxer->events & (DEMUX_EVENT_METADATA | DEMUX_EVENT_STREAMS))
3161         demux_update_replaygain(demuxer);
3162     if (demuxer->events & DEMUX_EVENT_DURATION)
3163         demuxer->duration = in->duration;
3164 
3165     pthread_mutex_unlock(&in->lock);
3166 }
3167 
demux_init_cuesheet(struct demuxer * demuxer)3168 static void demux_init_cuesheet(struct demuxer *demuxer)
3169 {
3170     char *cue = mp_tags_get_str(demuxer->metadata, "cuesheet");
3171     if (cue && !demuxer->num_chapters) {
3172         struct cue_file *f = mp_parse_cue(bstr0(cue));
3173         if (f) {
3174             if (mp_check_embedded_cue(f) < 0) {
3175                 MP_WARN(demuxer, "Embedded cue sheet references more than one file. "
3176                         "Ignoring it.\n");
3177             } else {
3178                 for (int n = 0; n < f->num_tracks; n++) {
3179                     struct cue_track *t = &f->tracks[n];
3180                     int idx = demuxer_add_chapter(demuxer, "", t->start, -1);
3181                     mp_tags_merge(demuxer->chapters[idx].metadata, t->tags);
3182                 }
3183             }
3184         }
3185         talloc_free(f);
3186     }
3187 }
3188 
3189 // A demuxer can use this during opening if all data was read from the stream.
3190 // Calling this after opening was completed is not allowed. Also, if opening
3191 // failed, this must not be called (or trying another demuxer would fail).
3192 // Useful so that e.g. subtitles don't keep the file or socket open.
3193 // If there's ever the situation where we can't allow the demuxer to close
3194 // the stream, this function could ignore the request.
demux_close_stream(struct demuxer * demuxer)3195 void demux_close_stream(struct demuxer *demuxer)
3196 {
3197     struct demux_internal *in = demuxer->in;
3198     assert(!in->threading && demuxer == in->d_thread);
3199 
3200     if (!demuxer->stream || !in->owns_stream)
3201         return;
3202 
3203     MP_VERBOSE(demuxer, "demuxer read all data; closing stream\n");
3204     free_stream(demuxer->stream);
3205     demuxer->stream = NULL;
3206     in->d_user->stream = NULL;
3207 }
3208 
demux_init_ccs(struct demuxer * demuxer,struct demux_opts * opts)3209 static void demux_init_ccs(struct demuxer *demuxer, struct demux_opts *opts)
3210 {
3211     struct demux_internal *in = demuxer->in;
3212     if (!opts->create_ccs)
3213         return;
3214     pthread_mutex_lock(&in->lock);
3215     for (int n = 0; n < in->num_streams; n++) {
3216         struct sh_stream *sh = in->streams[n];
3217         if (sh->type == STREAM_VIDEO && !sh->attached_picture)
3218             demuxer_get_cc_track_locked(sh);
3219     }
3220     pthread_mutex_unlock(&in->lock);
3221 }
3222 
3223 // Return whether "heavy" caching on this stream is enabled. By default, this
3224 // corresponds to whether the source stream is considered in the network. The
3225 // only effect should be adjusting display behavior (of cache stats etc.), and
3226 // possibly switching between which set of options influence cache settings.
demux_is_network_cached(demuxer_t * demuxer)3227 bool demux_is_network_cached(demuxer_t *demuxer)
3228 {
3229     struct demux_internal *in = demuxer->in;
3230     pthread_mutex_lock(&in->lock);
3231     bool r = in->using_network_cache_opts;
3232     pthread_mutex_unlock(&in->lock);
3233     return r;
3234 }
3235 
3236 struct parent_stream_info {
3237     bool seekable;
3238     bool is_network;
3239     bool is_streaming;
3240     int stream_origin;
3241     struct mp_cancel *cancel;
3242     char *filename;
3243 };
3244 
open_given_type(struct mpv_global * global,struct mp_log * log,const struct demuxer_desc * desc,struct stream * stream,struct parent_stream_info * sinfo,struct demuxer_params * params,enum demux_check check)3245 static struct demuxer *open_given_type(struct mpv_global *global,
3246                                        struct mp_log *log,
3247                                        const struct demuxer_desc *desc,
3248                                        struct stream *stream,
3249                                        struct parent_stream_info *sinfo,
3250                                        struct demuxer_params *params,
3251                                        enum demux_check check)
3252 {
3253     if (mp_cancel_test(sinfo->cancel))
3254         return NULL;
3255 
3256     struct demuxer *demuxer = talloc_ptrtype(NULL, demuxer);
3257     struct m_config_cache *opts_cache =
3258         m_config_cache_alloc(demuxer, global, &demux_conf);
3259     struct demux_opts *opts = opts_cache->opts;
3260     *demuxer = (struct demuxer) {
3261         .desc = desc,
3262         .stream = stream,
3263         .cancel = sinfo->cancel,
3264         .seekable = sinfo->seekable,
3265         .filepos = -1,
3266         .global = global,
3267         .log = mp_log_new(demuxer, log, desc->name),
3268         .glog = log,
3269         .filename = talloc_strdup(demuxer, sinfo->filename),
3270         .is_network = sinfo->is_network,
3271         .is_streaming = sinfo->is_streaming,
3272         .stream_origin = sinfo->stream_origin,
3273         .access_references = opts->access_references,
3274         .events = DEMUX_EVENT_ALL,
3275         .duration = -1,
3276     };
3277 
3278     struct demux_internal *in = demuxer->in = talloc_ptrtype(demuxer, in);
3279     *in = (struct demux_internal){
3280         .global = global,
3281         .log = demuxer->log,
3282         .stats = stats_ctx_create(in, global, "demuxer"),
3283         .can_cache = params && params->is_top_level,
3284         .can_record = params && params->stream_record,
3285         .opts = opts,
3286         .opts_cache = opts_cache,
3287         .d_thread = talloc(demuxer, struct demuxer),
3288         .d_user = demuxer,
3289         .after_seek = true, // (assumed identical to initial demuxer state)
3290         .after_seek_to_start = true,
3291         .highest_av_pts = MP_NOPTS_VALUE,
3292         .seeking_in_progress = MP_NOPTS_VALUE,
3293         .demux_ts = MP_NOPTS_VALUE,
3294         .owns_stream = !params->external_stream,
3295     };
3296     pthread_mutex_init(&in->lock, NULL);
3297     pthread_cond_init(&in->wakeup, NULL);
3298 
3299     *in->d_thread = *demuxer;
3300 
3301     in->d_thread->metadata = talloc_zero(in->d_thread, struct mp_tags);
3302 
3303     mp_dbg(log, "Trying demuxer: %s (force-level: %s)\n",
3304            desc->name, d_level(check));
3305 
3306     if (stream)
3307         stream_seek(stream, 0);
3308 
3309     in->d_thread->params = params; // temporary during open()
3310     int ret = demuxer->desc->open(in->d_thread, check);
3311     if (ret >= 0) {
3312         in->d_thread->params = NULL;
3313         if (in->d_thread->filetype)
3314             mp_verbose(log, "Detected file format: %s (%s)\n",
3315                        in->d_thread->filetype, desc->desc);
3316         else
3317             mp_verbose(log, "Detected file format: %s\n", desc->desc);
3318         if (!in->d_thread->seekable)
3319             mp_verbose(log, "Stream is not seekable.\n");
3320         if (!in->d_thread->seekable && opts->force_seekable) {
3321             mp_warn(log, "Not seekable, but enabling seeking on user request.\n");
3322             in->d_thread->seekable = true;
3323             in->d_thread->partially_seekable = true;
3324         }
3325         demux_init_cuesheet(in->d_thread);
3326         demux_init_ccs(demuxer, opts);
3327         demux_convert_tags_charset(in->d_thread);
3328         demux_copy(in->d_user, in->d_thread);
3329         in->duration = in->d_thread->duration;
3330         demuxer_sort_chapters(demuxer);
3331         in->events = DEMUX_EVENT_ALL;
3332 
3333         struct demuxer *sub = NULL;
3334         if (!(params && params->disable_timeline)) {
3335             struct timeline *tl = timeline_load(global, log, demuxer);
3336             if (tl) {
3337                 struct demuxer_params params2 = {0};
3338                 params2.timeline = tl;
3339                 params2.is_top_level = params && params->is_top_level;
3340                 params2.stream_record = params && params->stream_record;
3341                 sub =
3342                     open_given_type(global, log, &demuxer_desc_timeline,
3343                                     NULL, sinfo, &params2, DEMUX_CHECK_FORCE);
3344                 if (sub) {
3345                     in->can_cache = false;
3346                     in->can_record = false;
3347                 } else {
3348                     timeline_destroy(tl);
3349                 }
3350             }
3351         }
3352 
3353         switch_to_fresh_cache_range(in);
3354 
3355         update_opts(in);
3356 
3357         demux_update(demuxer, MP_NOPTS_VALUE);
3358 
3359         demuxer = sub ? sub : demuxer;
3360         return demuxer;
3361     }
3362 
3363     demuxer->stream = NULL;
3364     demux_free(demuxer);
3365     return NULL;
3366 }
3367 
3368 static const int d_normal[]  = {DEMUX_CHECK_NORMAL, DEMUX_CHECK_UNSAFE, -1};
3369 static const int d_request[] = {DEMUX_CHECK_REQUEST, -1};
3370 static const int d_force[]   = {DEMUX_CHECK_FORCE, -1};
3371 
3372 // params can be NULL
3373 // This may free the stream parameter on success.
demux_open(struct stream * stream,struct mp_cancel * cancel,struct demuxer_params * params,struct mpv_global * global)3374 static struct demuxer *demux_open(struct stream *stream,
3375                                   struct mp_cancel *cancel,
3376                                   struct demuxer_params *params,
3377                                   struct mpv_global *global)
3378 {
3379     const int *check_levels = d_normal;
3380     const struct demuxer_desc *check_desc = NULL;
3381     struct mp_log *log = mp_log_new(NULL, global->log, "!demux");
3382     struct demuxer *demuxer = NULL;
3383     char *force_format = params ? params->force_format : NULL;
3384 
3385     if (!force_format)
3386         force_format = stream->demuxer;
3387 
3388     if (force_format && force_format[0] && !stream->is_directory) {
3389         check_levels = d_request;
3390         if (force_format[0] == '+') {
3391             force_format += 1;
3392             check_levels = d_force;
3393         }
3394         for (int n = 0; demuxer_list[n]; n++) {
3395             if (strcmp(demuxer_list[n]->name, force_format) == 0)
3396                 check_desc = demuxer_list[n];
3397         }
3398         if (!check_desc) {
3399             mp_err(log, "Demuxer %s does not exist.\n", force_format);
3400             goto done;
3401         }
3402     }
3403 
3404     struct parent_stream_info sinfo = {
3405         .seekable = stream->seekable,
3406         .is_network = stream->is_network,
3407         .is_streaming = stream->streaming,
3408         .stream_origin = stream->stream_origin,
3409         .cancel = cancel,
3410         .filename = talloc_strdup(NULL, stream->url),
3411     };
3412 
3413     // Test demuxers from first to last, one pass for each check_levels[] entry
3414     for (int pass = 0; check_levels[pass] != -1; pass++) {
3415         enum demux_check level = check_levels[pass];
3416         mp_verbose(log, "Trying demuxers for level=%s.\n", d_level(level));
3417         for (int n = 0; demuxer_list[n]; n++) {
3418             const struct demuxer_desc *desc = demuxer_list[n];
3419             if (!check_desc || desc == check_desc) {
3420                 demuxer = open_given_type(global, log, desc, stream, &sinfo,
3421                                           params, level);
3422                 if (demuxer) {
3423                     talloc_steal(demuxer, log);
3424                     log = NULL;
3425                     goto done;
3426                 }
3427             }
3428         }
3429     }
3430 
3431 done:
3432     talloc_free(sinfo.filename);
3433     talloc_free(log);
3434     return demuxer;
3435 }
3436 
create_webshit_concat_stream(struct mpv_global * global,struct mp_cancel * c,bstr init,struct stream * real)3437 static struct stream *create_webshit_concat_stream(struct mpv_global *global,
3438                                                    struct mp_cancel *c,
3439                                                    bstr init, struct stream *real)
3440 {
3441     struct stream *mem = stream_memory_open(global, init.start, init.len);
3442     assert(mem);
3443 
3444     struct stream *streams[2] = {mem, real};
3445     struct stream *concat = stream_concat_open(global, c, streams, 2);
3446     if (!concat) {
3447         free_stream(mem);
3448         free_stream(real);
3449     }
3450     return concat;
3451 }
3452 
3453 // Convenience function: open the stream, enable the cache (according to params
3454 // and global opts.), open the demuxer.
3455 // Also for some reason may close the opened stream if it's not needed.
3456 // demuxer->cancel is not the cancel parameter, but is its own object that will
3457 // be a slave (mp_cancel_set_parent()) to provided cancel object.
3458 // demuxer->cancel is automatically freed.
demux_open_url(const char * url,struct demuxer_params * params,struct mp_cancel * cancel,struct mpv_global * global)3459 struct demuxer *demux_open_url(const char *url,
3460                                struct demuxer_params *params,
3461                                struct mp_cancel *cancel,
3462                                struct mpv_global *global)
3463 {
3464     if (!params)
3465         return NULL;
3466     struct mp_cancel *priv_cancel = mp_cancel_new(NULL);
3467     if (cancel)
3468         mp_cancel_set_parent(priv_cancel, cancel);
3469     struct stream *s = params->external_stream;
3470     if (!s) {
3471         s = stream_create(url, STREAM_READ | params->stream_flags,
3472                           priv_cancel, global);
3473         if (s && params->init_fragment.len) {
3474             s = create_webshit_concat_stream(global, priv_cancel,
3475                                              params->init_fragment, s);
3476         }
3477     }
3478     if (!s) {
3479         talloc_free(priv_cancel);
3480         return NULL;
3481     }
3482     struct demuxer *d = demux_open(s, priv_cancel, params, global);
3483     if (d) {
3484         talloc_steal(d->in, priv_cancel);
3485         assert(d->cancel);
3486     } else {
3487         params->demuxer_failed = true;
3488         if (!params->external_stream)
3489             free_stream(s);
3490         talloc_free(priv_cancel);
3491     }
3492     return d;
3493 }
3494 
3495 // clear the packet queues
demux_flush(demuxer_t * demuxer)3496 void demux_flush(demuxer_t *demuxer)
3497 {
3498     struct demux_internal *in = demuxer->in;
3499     assert(demuxer == in->d_user);
3500 
3501     pthread_mutex_lock(&in->lock);
3502     clear_reader_state(in, true);
3503     for (int n = 0; n < in->num_ranges; n++)
3504         clear_cached_range(in, in->ranges[n]);
3505     free_empty_cached_ranges(in);
3506     for (int n = 0; n < in->num_streams; n++) {
3507         struct demux_stream *ds = in->streams[n]->ds;
3508         ds->refreshing = false;
3509         ds->eof = false;
3510     }
3511     in->eof = false;
3512     in->seeking = false;
3513     pthread_mutex_unlock(&in->lock);
3514 }
3515 
3516 // Does some (but not all) things for switching to another range.
switch_current_range(struct demux_internal * in,struct demux_cached_range * range)3517 static void switch_current_range(struct demux_internal *in,
3518                                  struct demux_cached_range *range)
3519 {
3520     struct demux_cached_range *old = in->current_range;
3521     assert(old != range);
3522 
3523     set_current_range(in, range);
3524 
3525     if (old) {
3526         // Remove packets which can't be used when seeking back to the range.
3527         for (int n = 0; n < in->num_streams; n++) {
3528             struct demux_queue *queue = old->streams[n];
3529 
3530             // Remove all packets which cannot be involved in seeking.
3531             while (queue->head && !queue->head->keyframe)
3532                 remove_head_packet(queue);
3533         }
3534 
3535         // Exclude weird corner cases that break resuming.
3536         for (int n = 0; n < in->num_streams; n++) {
3537             struct demux_stream *ds = in->streams[n]->ds;
3538             // This is needed to resume or join the range at all.
3539             if (ds->selected && !(ds->global_correct_dts ||
3540                                   ds->global_correct_pos))
3541             {
3542                 MP_VERBOSE(in, "discarding unseekable range due to stream %d\n", n);
3543                 clear_cached_range(in, old);
3544                 break;
3545             }
3546         }
3547     }
3548 
3549     // Set up reading from new range (as well as writing to it).
3550     for (int n = 0; n < in->num_streams; n++) {
3551         struct demux_stream *ds = in->streams[n]->ds;
3552 
3553         ds->queue = range->streams[n];
3554         ds->refreshing = false;
3555         ds->eof = false;
3556     }
3557 
3558     // No point in keeping any junk (especially if old current_range is empty).
3559     free_empty_cached_ranges(in);
3560 
3561     // The change detection doesn't work across ranges.
3562     in->force_metadata_update = true;
3563 }
3564 
3565 // Search for the entry with the highest index with entry.pts <= pts true.
search_index(struct demux_queue * queue,double pts)3566 static struct demux_packet *search_index(struct demux_queue *queue, double pts)
3567 {
3568     size_t a = 0;
3569     size_t b = queue->num_index;
3570 
3571     while (a < b) {
3572         size_t m = a + (b - a) / 2;
3573         struct index_entry *e = &QUEUE_INDEX_ENTRY(queue, m);
3574 
3575         bool m_ok = e->pts <= pts;
3576 
3577         if (a + 1 == b)
3578             return m_ok ? e->pkt : NULL;
3579 
3580         if (m_ok) {
3581             a = m;
3582         } else {
3583             b = m;
3584         }
3585     }
3586 
3587     return NULL;
3588 }
3589 
find_seek_target(struct demux_queue * queue,double pts,int flags)3590 static struct demux_packet *find_seek_target(struct demux_queue *queue,
3591                                              double pts, int flags)
3592 {
3593     pts -= queue->ds->sh->seek_preroll;
3594 
3595     struct demux_packet *start = search_index(queue, pts);
3596     if (!start)
3597         start = queue->head;
3598 
3599     struct demux_packet *target = NULL;
3600     struct demux_packet *next = NULL;
3601     for (struct demux_packet *dp = start; dp; dp = next) {
3602         next = dp->next;
3603         if (!dp->keyframe)
3604             continue;
3605 
3606         double range_pts;
3607         next = compute_keyframe_times(dp, &range_pts, NULL);
3608 
3609         if (range_pts == MP_NOPTS_VALUE)
3610             continue;
3611 
3612         if (flags & SEEK_FORWARD) {
3613             // Stop on the first packet that is >= pts.
3614             if (target)
3615                 break;
3616             if (range_pts < pts)
3617                 continue;
3618         } else {
3619             // Stop before the first packet that is > pts.
3620             // This still returns a packet with > pts if there's no better one.
3621             if (target && range_pts > pts)
3622                 break;
3623         }
3624 
3625         target = dp;
3626     }
3627 
3628     return target;
3629 }
3630 
3631 // Return a cache range for the given pts/flags, or NULL if none available.
3632 // must be called locked
find_cache_seek_range(struct demux_internal * in,double pts,int flags)3633 static struct demux_cached_range *find_cache_seek_range(struct demux_internal *in,
3634                                                         double pts, int flags)
3635 {
3636     // Note about queued low level seeks: in->seeking can be true here, and it
3637     // might come from a previous resume seek to the current range. If we end
3638     // up seeking into the current range (i.e. just changing time offset), the
3639     // seek needs to continue. Otherwise, we override the queued seek anyway.
3640     if ((flags & SEEK_FACTOR) || !in->seekable_cache)
3641         return NULL;
3642 
3643     struct demux_cached_range *res = NULL;
3644 
3645     for (int n = 0; n < in->num_ranges; n++) {
3646         struct demux_cached_range *r = in->ranges[n];
3647         if (r->seek_start != MP_NOPTS_VALUE) {
3648             MP_VERBOSE(in, "cached range %d: %f <-> %f (bof=%d, eof=%d)\n",
3649                        n, r->seek_start, r->seek_end, r->is_bof, r->is_eof);
3650 
3651             if ((pts >= r->seek_start || r->is_bof) &&
3652                 (pts <= r->seek_end || r->is_eof))
3653             {
3654                 MP_VERBOSE(in, "...using this range for in-cache seek.\n");
3655                 res = r;
3656                 break;
3657             }
3658         }
3659     }
3660 
3661     return res;
3662 }
3663 
3664 // Adjust the seek target to the found video key frames. Otherwise the
3665 // video will undershoot the seek target, while audio will be closer to it.
3666 // The player frontend will play the additional video without audio, so
3667 // you get silent audio for the amount of "undershoot". Adjusting the seek
3668 // target will make the audio seek to the video target or before.
3669 // (If hr-seeks are used, it's better to skip this, as it would only mean
3670 // that more audio data than necessary would have to be decoded.)
adjust_cache_seek_target(struct demux_internal * in,struct demux_cached_range * range,double * pts,int * flags)3671 static void adjust_cache_seek_target(struct demux_internal *in,
3672                                      struct demux_cached_range *range,
3673                                      double *pts, int *flags)
3674 {
3675     if (*flags & SEEK_HR)
3676         return;
3677 
3678     for (int n = 0; n < in->num_streams; n++) {
3679         struct demux_stream *ds = in->streams[n]->ds;
3680         struct demux_queue *queue = range->streams[n];
3681         if (ds->selected && ds->type == STREAM_VIDEO) {
3682             struct demux_packet *target = find_seek_target(queue, *pts, *flags);
3683             if (target) {
3684                 double target_pts;
3685                 compute_keyframe_times(target, &target_pts, NULL);
3686                 if (target_pts != MP_NOPTS_VALUE) {
3687                     MP_VERBOSE(in, "adjust seek target %f -> %f\n",
3688                                *pts, target_pts);
3689                     // (We assume the find_seek_target() call will return
3690                     // the same target for the video stream.)
3691                     *pts = target_pts;
3692                     *flags &= ~SEEK_FORWARD;
3693                 }
3694             }
3695             break;
3696         }
3697     }
3698 }
3699 
3700 // must be called locked
3701 // range must be non-NULL and from find_cache_seek_range() using the same pts
3702 // and flags, before any other changes to the cached state
execute_cache_seek(struct demux_internal * in,struct demux_cached_range * range,double pts,int flags)3703 static void execute_cache_seek(struct demux_internal *in,
3704                                struct demux_cached_range *range,
3705                                double pts, int flags)
3706 {
3707     adjust_cache_seek_target(in, range, &pts, &flags);
3708 
3709     for (int n = 0; n < in->num_streams; n++) {
3710         struct demux_stream *ds = in->streams[n]->ds;
3711         struct demux_queue *queue = range->streams[n];
3712 
3713         struct demux_packet *target = find_seek_target(queue, pts, flags);
3714         ds->reader_head = target;
3715         ds->skip_to_keyframe = !target;
3716         if (ds->reader_head)
3717             ds->base_ts = MP_PTS_OR_DEF(ds->reader_head->pts, ds->reader_head->dts);
3718 
3719         MP_VERBOSE(in, "seeking stream %d (%s) to ",
3720                    n, stream_type_name(ds->type));
3721 
3722         if (target) {
3723             MP_VERBOSE(in, "packet %f/%f\n", target->pts, target->dts);
3724         } else {
3725             MP_VERBOSE(in, "nothing\n");
3726         }
3727     }
3728 
3729     // If we seek to another range, we want to seek the low level demuxer to
3730     // there as well, because reader and demuxer queue must be the same.
3731     if (in->current_range != range) {
3732         switch_current_range(in, range);
3733 
3734         in->seeking = true;
3735         in->seek_flags = SEEK_HR;
3736         in->seek_pts = range->seek_end - 1.0;
3737 
3738         // When new packets are being appended, they could overlap with the old
3739         // range due to demuxer seek imprecisions, or because the queue contains
3740         // packets past the seek target but before the next seek target. Don't
3741         // append them twice, instead skip them until new packets are found.
3742         for (int n = 0; n < in->num_streams; n++) {
3743             struct demux_stream *ds = in->streams[n]->ds;
3744 
3745             ds->refreshing = ds->selected;
3746         }
3747 
3748         MP_VERBOSE(in, "resuming demuxer to end of cached range\n");
3749     }
3750 }
3751 
3752 // Create a new blank cache range, and backup the old one. If the seekable
3753 // demuxer cache is disabled, merely reset the current range to a blank state.
switch_to_fresh_cache_range(struct demux_internal * in)3754 static void switch_to_fresh_cache_range(struct demux_internal *in)
3755 {
3756     if (!in->seekable_cache && in->current_range) {
3757         clear_cached_range(in, in->current_range);
3758         return;
3759     }
3760 
3761     struct demux_cached_range *range = talloc_ptrtype(NULL, range);
3762     *range = (struct demux_cached_range){
3763         .seek_start = MP_NOPTS_VALUE,
3764         .seek_end = MP_NOPTS_VALUE,
3765     };
3766     MP_TARRAY_APPEND(in, in->ranges, in->num_ranges, range);
3767     add_missing_streams(in, range);
3768 
3769     switch_current_range(in, range);
3770 }
3771 
demux_seek(demuxer_t * demuxer,double seek_pts,int flags)3772 int demux_seek(demuxer_t *demuxer, double seek_pts, int flags)
3773 {
3774     struct demux_internal *in = demuxer->in;
3775     assert(demuxer == in->d_user);
3776 
3777     pthread_mutex_lock(&in->lock);
3778 
3779     if (!(flags & SEEK_FACTOR))
3780         seek_pts = MP_ADD_PTS(seek_pts, -in->ts_offset);
3781 
3782     int res = queue_seek(in, seek_pts, flags, true);
3783 
3784     pthread_cond_signal(&in->wakeup);
3785     pthread_mutex_unlock(&in->lock);
3786 
3787     return res;
3788 }
3789 
queue_seek(struct demux_internal * in,double seek_pts,int flags,bool clear_back_state)3790 static bool queue_seek(struct demux_internal *in, double seek_pts, int flags,
3791                        bool clear_back_state)
3792 {
3793     if (seek_pts == MP_NOPTS_VALUE)
3794         return false;
3795 
3796     MP_VERBOSE(in, "queuing seek to %f%s\n", seek_pts,
3797                in->seeking ? " (cascade)" : "");
3798 
3799     bool require_cache = flags & SEEK_CACHED;
3800     flags &= ~(unsigned)SEEK_CACHED;
3801 
3802     bool set_backwards = flags & SEEK_SATAN;
3803     flags &= ~(unsigned)SEEK_SATAN;
3804 
3805     bool force_seek = flags & SEEK_FORCE;
3806     flags &= ~(unsigned)SEEK_FORCE;
3807 
3808     bool block = flags & SEEK_BLOCK;
3809     flags &= ~(unsigned)SEEK_BLOCK;
3810 
3811     struct demux_cached_range *cache_target =
3812         find_cache_seek_range(in, seek_pts, flags);
3813 
3814     if (!cache_target) {
3815         if (require_cache) {
3816             MP_VERBOSE(in, "Cached seek not possible.\n");
3817             return false;
3818         }
3819         if (!in->d_thread->seekable && !force_seek) {
3820             MP_WARN(in, "Cannot seek in this file.\n");
3821             return false;
3822         }
3823     }
3824 
3825     in->eof = false;
3826     in->reading = false;
3827     in->back_demuxing = set_backwards;
3828 
3829     clear_reader_state(in, clear_back_state);
3830 
3831     in->blocked = block;
3832 
3833     if (cache_target) {
3834         execute_cache_seek(in, cache_target, seek_pts, flags);
3835     } else {
3836         switch_to_fresh_cache_range(in);
3837 
3838         in->seeking = true;
3839         in->seek_flags = flags;
3840         in->seek_pts = seek_pts;
3841     }
3842 
3843     for (int n = 0; n < in->num_streams; n++) {
3844         struct demux_stream *ds = in->streams[n]->ds;
3845 
3846         if (in->back_demuxing) {
3847             if (ds->back_seek_pos == MP_NOPTS_VALUE)
3848                 ds->back_seek_pos = seek_pts;
3849             // Process possibly cached packets.
3850             back_demux_see_packets(in->streams[n]->ds);
3851         }
3852 
3853         wakeup_ds(ds);
3854     }
3855 
3856     if (!in->threading && in->seeking)
3857         execute_seek(in);
3858 
3859     return true;
3860 }
3861 
demuxer_stream_by_demuxer_id(struct demuxer * d,enum stream_type t,int id)3862 struct sh_stream *demuxer_stream_by_demuxer_id(struct demuxer *d,
3863                                                enum stream_type t, int id)
3864 {
3865     if (id < 0)
3866         return NULL;
3867     int num = demux_get_num_stream(d);
3868     for (int n = 0; n < num; n++) {
3869         struct sh_stream *s = demux_get_stream(d, n);
3870         if (s->type == t && s->demuxer_id == id)
3871             return s;
3872     }
3873     return NULL;
3874 }
3875 
3876 // An obscure mechanism to get stream switching to be executed "faster" (as
3877 // perceived by the user), by making the stream return packets from the
3878 // current position
3879 // On a switch, it seeks back, and then grabs all packets that were
3880 // "missing" from the packet queue of the newly selected stream.
initiate_refresh_seek(struct demux_internal * in,struct demux_stream * stream,double start_ts)3881 static void initiate_refresh_seek(struct demux_internal *in,
3882                                   struct demux_stream *stream,
3883                                   double start_ts)
3884 {
3885     struct demuxer *demux = in->d_thread;
3886     bool seekable = demux->desc->seek && demux->seekable &&
3887                     !demux->partially_seekable;
3888 
3889     bool normal_seek = true;
3890     bool refresh_possible = true;
3891     for (int n = 0; n < in->num_streams; n++) {
3892         struct demux_stream *ds = in->streams[n]->ds;
3893 
3894         if (!ds->selected)
3895             continue;
3896 
3897         if (ds->type == STREAM_VIDEO || ds->type == STREAM_AUDIO)
3898             start_ts = MP_PTS_MIN(start_ts, ds->base_ts);
3899 
3900         // If there were no other streams selected, we can use a normal seek.
3901         normal_seek &= stream == ds;
3902 
3903         refresh_possible &= ds->queue->correct_dts || ds->queue->correct_pos;
3904     }
3905 
3906     if (start_ts == MP_NOPTS_VALUE || !seekable)
3907         return;
3908 
3909     if (!normal_seek) {
3910         if (!refresh_possible) {
3911             MP_VERBOSE(in, "can't issue refresh seek\n");
3912             return;
3913         }
3914 
3915         for (int n = 0; n < in->num_streams; n++) {
3916             struct demux_stream *ds = in->streams[n]->ds;
3917 
3918             bool correct_pos = ds->queue->correct_pos;
3919             bool correct_dts = ds->queue->correct_dts;
3920 
3921             // We need to re-read all packets anyway, so discard the buffered
3922             // data. (In theory, we could keep the packets, and be able to use
3923             // it for seeking if partially read streams are deselected again,
3924             // but this causes other problems like queue overflows when
3925             // selecting a new stream.)
3926             ds_clear_reader_queue_state(ds);
3927             clear_queue(ds->queue);
3928 
3929             // Streams which didn't have any packets yet will return all packets,
3930             // other streams return packets only starting from the last position.
3931             if (ds->selected && (ds->last_ret_pos != -1 ||
3932                                  ds->last_ret_dts != MP_NOPTS_VALUE))
3933             {
3934                 ds->refreshing = true;
3935                 ds->queue->correct_dts = correct_dts;
3936                 ds->queue->correct_pos = correct_pos;
3937                 ds->queue->last_pos = ds->last_ret_pos;
3938                 ds->queue->last_dts = ds->last_ret_dts;
3939             }
3940 
3941             update_seek_ranges(in->current_range);
3942         }
3943 
3944         start_ts -= 1.0; // small offset to get correct overlap
3945     }
3946 
3947     MP_VERBOSE(in, "refresh seek to %f\n", start_ts);
3948     in->seeking = true;
3949     in->seek_flags = SEEK_HR;
3950     in->seek_pts = start_ts;
3951 }
3952 
3953 // Set whether the given stream should return packets.
3954 // ref_pts is used only if the stream is enabled. Then it serves as approximate
3955 // start pts for this stream (in the worst case it is ignored).
demuxer_select_track(struct demuxer * demuxer,struct sh_stream * stream,double ref_pts,bool selected)3956 void demuxer_select_track(struct demuxer *demuxer, struct sh_stream *stream,
3957                           double ref_pts, bool selected)
3958 {
3959     struct demux_internal *in = demuxer->in;
3960     struct demux_stream *ds = stream->ds;
3961     pthread_mutex_lock(&in->lock);
3962     ref_pts = MP_ADD_PTS(ref_pts, -in->ts_offset);
3963     // don't flush buffers if stream is already selected / unselected
3964     if (ds->selected != selected) {
3965         MP_VERBOSE(in, "%sselect track %d\n", selected ? "" : "de", stream->index);
3966         ds->selected = selected;
3967         update_stream_selection_state(in, ds);
3968         in->tracks_switched = true;
3969         if (ds->selected) {
3970             if (in->back_demuxing)
3971                 ds->back_seek_pos = ref_pts;
3972             if (!in->after_seek)
3973                 initiate_refresh_seek(in, ds, ref_pts);
3974         }
3975         if (in->threading) {
3976             pthread_cond_signal(&in->wakeup);
3977         } else {
3978             execute_trackswitch(in);
3979         }
3980     }
3981     pthread_mutex_unlock(&in->lock);
3982 }
3983 
3984 // Execute a refresh seek on the given stream.
3985 // ref_pts has the same meaning as with demuxer_select_track()
demuxer_refresh_track(struct demuxer * demuxer,struct sh_stream * stream,double ref_pts)3986 void demuxer_refresh_track(struct demuxer *demuxer, struct sh_stream *stream,
3987                            double ref_pts)
3988 {
3989     struct demux_internal *in = demuxer->in;
3990     struct demux_stream *ds = stream->ds;
3991     pthread_mutex_lock(&in->lock);
3992     ref_pts = MP_ADD_PTS(ref_pts, -in->ts_offset);
3993     if (ds->selected) {
3994         MP_VERBOSE(in, "refresh track %d\n", stream->index);
3995         update_stream_selection_state(in, ds);
3996         if (in->back_demuxing)
3997             ds->back_seek_pos = ref_pts;
3998         if (!in->after_seek)
3999             initiate_refresh_seek(in, ds, ref_pts);
4000     }
4001     pthread_mutex_unlock(&in->lock);
4002 }
4003 
4004 // This is for demuxer implementations only. demuxer_select_track() sets the
4005 // logical state, while this function returns the actual state (in case the
4006 // demuxer attempts to cache even unselected packets for track switching - this
4007 // will potentially be done in the future).
demux_stream_is_selected(struct sh_stream * stream)4008 bool demux_stream_is_selected(struct sh_stream *stream)
4009 {
4010     if (!stream)
4011         return false;
4012     bool r = false;
4013     pthread_mutex_lock(&stream->ds->in->lock);
4014     r = stream->ds->selected;
4015     pthread_mutex_unlock(&stream->ds->in->lock);
4016     return r;
4017 }
4018 
demux_set_stream_wakeup_cb(struct sh_stream * sh,void (* cb)(void * ctx),void * ctx)4019 void demux_set_stream_wakeup_cb(struct sh_stream *sh,
4020                                 void (*cb)(void *ctx), void *ctx)
4021 {
4022     pthread_mutex_lock(&sh->ds->in->lock);
4023     sh->ds->wakeup_cb = cb;
4024     sh->ds->wakeup_cb_ctx = ctx;
4025     sh->ds->need_wakeup = true;
4026     pthread_mutex_unlock(&sh->ds->in->lock);
4027 }
4028 
demuxer_add_attachment(demuxer_t * demuxer,char * name,char * type,void * data,size_t data_size)4029 int demuxer_add_attachment(demuxer_t *demuxer, char *name, char *type,
4030                            void *data, size_t data_size)
4031 {
4032     if (!(demuxer->num_attachments % 32))
4033         demuxer->attachments = talloc_realloc(demuxer, demuxer->attachments,
4034                                               struct demux_attachment,
4035                                               demuxer->num_attachments + 32);
4036 
4037     struct demux_attachment *att = &demuxer->attachments[demuxer->num_attachments];
4038     att->name = talloc_strdup(demuxer->attachments, name);
4039     att->type = talloc_strdup(demuxer->attachments, type);
4040     att->data = talloc_memdup(demuxer->attachments, data, data_size);
4041     att->data_size = data_size;
4042 
4043     return demuxer->num_attachments++;
4044 }
4045 
chapter_compare(const void * p1,const void * p2)4046 static int chapter_compare(const void *p1, const void *p2)
4047 {
4048     struct demux_chapter *c1 = (void *)p1;
4049     struct demux_chapter *c2 = (void *)p2;
4050 
4051     if (c1->pts > c2->pts)
4052         return 1;
4053     else if (c1->pts < c2->pts)
4054         return -1;
4055     return c1->original_index > c2->original_index ? 1 :-1; // never equal
4056 }
4057 
demuxer_sort_chapters(demuxer_t * demuxer)4058 static void demuxer_sort_chapters(demuxer_t *demuxer)
4059 {
4060     if (demuxer->num_chapters) {
4061         qsort(demuxer->chapters, demuxer->num_chapters,
4062             sizeof(struct demux_chapter), chapter_compare);
4063     }
4064 }
4065 
demuxer_add_chapter(demuxer_t * demuxer,char * name,double pts,uint64_t demuxer_id)4066 int demuxer_add_chapter(demuxer_t *demuxer, char *name,
4067                         double pts, uint64_t demuxer_id)
4068 {
4069     struct demux_chapter new = {
4070         .original_index = demuxer->num_chapters,
4071         .pts = pts,
4072         .metadata = talloc_zero(demuxer, struct mp_tags),
4073         .demuxer_id = demuxer_id,
4074     };
4075     mp_tags_set_str(new.metadata, "TITLE", name);
4076     MP_TARRAY_APPEND(demuxer, demuxer->chapters, demuxer->num_chapters, new);
4077     return demuxer->num_chapters - 1;
4078 }
4079 
4080 // Disallow reading any packets and make readers think there is no new data
4081 // yet, until a seek is issued.
demux_block_reading(struct demuxer * demuxer,bool block)4082 void demux_block_reading(struct demuxer *demuxer, bool block)
4083 {
4084     struct demux_internal *in = demuxer->in;
4085     assert(demuxer == in->d_user);
4086 
4087     pthread_mutex_lock(&in->lock);
4088     in->blocked = block;
4089     for (int n = 0; n < in->num_streams; n++) {
4090         in->streams[n]->ds->need_wakeup = true;
4091         wakeup_ds(in->streams[n]->ds);
4092     }
4093     pthread_cond_signal(&in->wakeup);
4094     pthread_mutex_unlock(&in->lock);
4095 }
4096 
update_bytes_read(struct demux_internal * in)4097 static void update_bytes_read(struct demux_internal *in)
4098 {
4099     struct demuxer *demuxer = in->d_thread;
4100 
4101     int64_t new = in->slave_unbuffered_read_bytes;
4102     in->slave_unbuffered_read_bytes = 0;
4103 
4104     int64_t new_seeks = 0;
4105 
4106     struct stream *stream = demuxer->stream;
4107     if (stream) {
4108         new += stream->total_unbuffered_read_bytes;
4109         stream->total_unbuffered_read_bytes = 0;
4110         new_seeks += stream->total_stream_seeks;
4111         stream->total_stream_seeks = 0;
4112     }
4113 
4114     in->cache_unbuffered_read_bytes += new;
4115     in->hack_unbuffered_read_bytes += new;
4116     in->byte_level_seeks += new_seeks;
4117 }
4118 
4119 // must be called locked, temporarily unlocks
update_cache(struct demux_internal * in)4120 static void update_cache(struct demux_internal *in)
4121 {
4122     struct demuxer *demuxer = in->d_thread;
4123     struct stream *stream = demuxer->stream;
4124 
4125     int64_t now = mp_time_us();
4126     int64_t diff = now - in->last_speed_query;
4127     bool do_update = diff >= MP_SECOND_US;
4128 
4129     // Don't lock while querying the stream.
4130     pthread_mutex_unlock(&in->lock);
4131 
4132     int64_t stream_size = -1;
4133     struct mp_tags *stream_metadata = NULL;
4134     if (stream) {
4135         if (do_update)
4136             stream_size = stream_get_size(stream);
4137         stream_control(stream, STREAM_CTRL_GET_METADATA, &stream_metadata);
4138     }
4139 
4140     pthread_mutex_lock(&in->lock);
4141 
4142     update_bytes_read(in);
4143 
4144     if (do_update)
4145         in->stream_size = stream_size;
4146     if (stream_metadata) {
4147         add_timed_metadata(in, stream_metadata, NULL, MP_NOPTS_VALUE);
4148         talloc_free(stream_metadata);
4149     }
4150 
4151     in->next_cache_update = INT64_MAX;
4152 
4153     if (do_update) {
4154         uint64_t bytes = in->cache_unbuffered_read_bytes;
4155         in->cache_unbuffered_read_bytes = 0;
4156         in->last_speed_query = now;
4157         double speed = bytes / (diff / (double)MP_SECOND_US);
4158         in->bytes_per_second = 0.5 * in->speed_query_prev_sample +
4159                                0.5 * speed;
4160         in->speed_query_prev_sample = speed;
4161     }
4162     // The idea is to update as long as there is "activity".
4163     if (in->bytes_per_second)
4164         in->next_cache_update = now + MP_SECOND_US + 1;
4165 }
4166 
dumper_close(struct demux_internal * in)4167 static void dumper_close(struct demux_internal *in)
4168 {
4169     if (in->dumper)
4170         mp_recorder_destroy(in->dumper);
4171     in->dumper = NULL;
4172     if (in->dumper_status == CONTROL_TRUE)
4173         in->dumper_status = CONTROL_FALSE; // make abort equal to success
4174 }
4175 
range_time_compare(const void * p1,const void * p2)4176 static int range_time_compare(const void *p1, const void *p2)
4177 {
4178     struct demux_cached_range *r1 = *((struct demux_cached_range **)p1);
4179     struct demux_cached_range *r2 = *((struct demux_cached_range **)p2);
4180 
4181     if (r1->seek_start == r2->seek_start)
4182         return 0;
4183     return r1->seek_start < r2->seek_start ? -1 : 1;
4184 }
4185 
dump_cache(struct demux_internal * in,double start,double end)4186 static void dump_cache(struct demux_internal *in, double start, double end)
4187 {
4188     in->dumper_status = in->dumper ? CONTROL_TRUE : CONTROL_ERROR;
4189     if (!in->dumper)
4190         return;
4191 
4192     // (only in pathological cases there might be more ranges than allowed)
4193     struct demux_cached_range *ranges[MAX_SEEK_RANGES];
4194     int num_ranges = 0;
4195     for (int n = 0; n < MPMIN(MP_ARRAY_SIZE(ranges), in->num_ranges); n++)
4196         ranges[num_ranges++] = in->ranges[n];
4197     qsort(ranges, num_ranges, sizeof(ranges[0]), range_time_compare);
4198 
4199     for (int n = 0; n < num_ranges; n++) {
4200         struct demux_cached_range *r = ranges[n];
4201         if (r->seek_start == MP_NOPTS_VALUE)
4202             continue;
4203         if (r->seek_end <= start)
4204             continue;
4205         if (end != MP_NOPTS_VALUE && r->seek_start >= end)
4206             continue;
4207 
4208         mp_recorder_mark_discontinuity(in->dumper);
4209 
4210         double pts = start;
4211         int flags = 0;
4212         adjust_cache_seek_target(in, r, &pts, &flags);
4213 
4214         for (int i = 0; i < r->num_streams; i++) {
4215             struct demux_queue *q = r->streams[i];
4216             struct demux_stream *ds = q->ds;
4217 
4218             ds->dump_pos = find_seek_target(q, pts, flags);
4219         }
4220 
4221         // We need to reinterleave the separate streams somehow, which makes
4222         // everything more complex.
4223         while (1) {
4224             struct demux_packet *next = NULL;
4225             double next_dts = MP_NOPTS_VALUE;
4226 
4227             for (int i = 0; i < r->num_streams; i++) {
4228                 struct demux_stream *ds = r->streams[i]->ds;
4229                 struct demux_packet *dp = ds->dump_pos;
4230 
4231                 if (!dp)
4232                     continue;
4233                 assert(dp->stream == ds->index);
4234 
4235                 double pdts = MP_PTS_OR_DEF(dp->dts, dp->pts);
4236 
4237                 // Check for stream EOF. Note that we don't try to EOF
4238                 // streams at the same point (e.g. video can take longer
4239                 // to finish than audio, so the output file will have no
4240                 // audio for the last part of the video). Too much effort.
4241                 if (pdts != MP_NOPTS_VALUE && end != MP_NOPTS_VALUE &&
4242                     pdts >= end && dp->keyframe)
4243                 {
4244                     ds->dump_pos = NULL;
4245                     continue;
4246                 }
4247 
4248                 if (pdts == MP_NOPTS_VALUE || next_dts == MP_NOPTS_VALUE ||
4249                     pdts < next_dts)
4250                 {
4251                     next_dts = pdts;
4252                     next = dp;
4253                 }
4254             }
4255 
4256             if (!next)
4257                 break;
4258 
4259             struct demux_stream *ds = in->streams[next->stream]->ds;
4260             ds->dump_pos = next->next;
4261 
4262             struct demux_packet *dp = read_packet_from_cache(in, next);
4263             if (!dp) {
4264                 in->dumper_status = CONTROL_ERROR;
4265                 break;
4266             }
4267 
4268             write_dump_packet(in, dp);
4269 
4270             talloc_free(dp);
4271         }
4272 
4273         if (in->dumper_status != CONTROL_OK)
4274             break;
4275     }
4276 
4277     // (strictly speaking unnecessary; for clarity)
4278     for (int n = 0; n < in->num_streams; n++)
4279         in->streams[n]->ds->dump_pos = NULL;
4280 
4281     // If dumping (in end==NOPTS mode) doesn't continue at the range that
4282     // was written last, we have a discontinuity.
4283     if (num_ranges && ranges[num_ranges - 1] != in->current_range)
4284         mp_recorder_mark_discontinuity(in->dumper);
4285 
4286     // end=NOPTS means the demuxer output continues to be written to the
4287     // dump file.
4288     if (end != MP_NOPTS_VALUE || in->dumper_status != CONTROL_OK)
4289         dumper_close(in);
4290 }
4291 
4292 // Set the current cache dumping mode. There is only at most 1 dump process
4293 // active, so calling this aborts the previous dumping. Passing file==NULL
4294 // stops dumping.
4295 // This is synchronous with demux_cache_dump_get_status() (i.e. starting or
4296 // aborting is not asynchronous). On status change, the demuxer wakeup callback
4297 // is invoked (except for this call).
4298 // Returns whether dumping was logically started.
demux_cache_dump_set(struct demuxer * demuxer,double start,double end,char * file)4299 bool demux_cache_dump_set(struct demuxer *demuxer, double start, double end,
4300                           char *file)
4301 {
4302     struct demux_internal *in = demuxer->in;
4303     assert(demuxer == in->d_user);
4304 
4305     bool res = false;
4306 
4307     pthread_mutex_lock(&in->lock);
4308 
4309     start = MP_ADD_PTS(start, -in->ts_offset);
4310     end = MP_ADD_PTS(end, -in->ts_offset);
4311 
4312     dumper_close(in);
4313 
4314     if (file && file[0] && start != MP_NOPTS_VALUE) {
4315         res = true;
4316 
4317         in->dumper = recorder_create(in, file);
4318 
4319         // This is not asynchronous and will freeze the shit for a while if the
4320         // user is unlucky. It could be moved to a thread with some effort.
4321         // General idea: iterate over all cache ranges, dump what intersects.
4322         // After that, and if the user requested it, make it dump all newly
4323         // received packets, even if it's awkward (consider the case if the
4324         // current range is not the last range).
4325         dump_cache(in, start, end);
4326     }
4327 
4328     pthread_mutex_unlock(&in->lock);
4329 
4330     return res;
4331 }
4332 
4333 // Returns one of CONTROL_*. CONTROL_TRUE means dumping is in progress.
demux_cache_dump_get_status(struct demuxer * demuxer)4334 int demux_cache_dump_get_status(struct demuxer *demuxer)
4335 {
4336     struct demux_internal *in = demuxer->in;
4337     pthread_mutex_lock(&in->lock);
4338     int status = in->dumper_status;
4339     pthread_mutex_unlock(&in->lock);
4340     return status;
4341 }
4342 
4343 // Return what range demux_cache_dump_set() would (probably) yield. This is a
4344 // conservative amount (in addition to internal consistency of this code, it
4345 // depends on what a player will do with the resulting file).
4346 // Use for_end==true to get the end of dumping, other the start.
4347 // Returns NOPTS if nothing was found.
demux_probe_cache_dump_target(struct demuxer * demuxer,double pts,bool for_end)4348 double demux_probe_cache_dump_target(struct demuxer *demuxer, double pts,
4349                                      bool for_end)
4350 {
4351     struct demux_internal *in = demuxer->in;
4352     assert(demuxer == in->d_user);
4353 
4354     double res = MP_NOPTS_VALUE;
4355     if (pts == MP_NOPTS_VALUE)
4356         return pts;
4357 
4358     pthread_mutex_lock(&in->lock);
4359 
4360     pts = MP_ADD_PTS(pts, -in->ts_offset);
4361 
4362     // (When determining the end, look before the keyframe at pts, so subtract
4363     // an arbitrary amount to round down.)
4364     double seek_pts = for_end ? pts - 0.001 : pts;
4365     int flags = 0;
4366     struct demux_cached_range *r = find_cache_seek_range(in, seek_pts, flags);
4367     if (r) {
4368         if (!for_end)
4369             adjust_cache_seek_target(in, r, &pts, &flags);
4370 
4371         double t[STREAM_TYPE_COUNT];
4372         for (int n = 0; n < STREAM_TYPE_COUNT; n++)
4373             t[n] = MP_NOPTS_VALUE;
4374 
4375         for (int n = 0; n < in->num_streams; n++) {
4376             struct demux_stream *ds = in->streams[n]->ds;
4377             struct demux_queue *q = r->streams[n];
4378 
4379             struct demux_packet *dp = find_seek_target(q, pts, flags);
4380             if (dp) {
4381                 if (for_end) {
4382                     while (dp) {
4383                         double pdts = MP_PTS_OR_DEF(dp->dts, dp->pts);
4384 
4385                         if (pdts != MP_NOPTS_VALUE && pdts >= pts && dp->keyframe)
4386                             break;
4387 
4388                         t[ds->type] = MP_PTS_MAX(t[ds->type], pdts);
4389 
4390                         dp = dp->next;
4391                     }
4392                 } else {
4393                     double start;
4394                     compute_keyframe_times(dp, &start, NULL);
4395                     start = MP_PTS_MAX(start, r->seek_start);
4396                     t[ds->type] = MP_PTS_MAX(t[ds->type], start);
4397                 }
4398             }
4399         }
4400 
4401         res = t[STREAM_VIDEO];
4402         if (res == MP_NOPTS_VALUE)
4403             res = t[STREAM_AUDIO];
4404         if (res == MP_NOPTS_VALUE) {
4405             for (int n = 0; n < STREAM_TYPE_COUNT; n++) {
4406                 res = t[n];
4407                 if (res != MP_NOPTS_VALUE)
4408                     break;
4409             }
4410         }
4411     }
4412 
4413     res = MP_ADD_PTS(res, in->ts_offset);
4414 
4415     pthread_mutex_unlock(&in->lock);
4416 
4417     return res;
4418 }
4419 
4420 // Used by demuxers to report the amount of transferred bytes. This is for
4421 // streams which circumvent demuxer->stream (stream statistics are handled by
4422 // demux.c itself).
demux_report_unbuffered_read_bytes(struct demuxer * demuxer,int64_t new)4423 void demux_report_unbuffered_read_bytes(struct demuxer *demuxer, int64_t new)
4424 {
4425     struct demux_internal *in = demuxer->in;
4426     assert(demuxer == in->d_thread);
4427 
4428     in->slave_unbuffered_read_bytes += new;
4429 }
4430 
4431 // Return bytes read since last query. It's a hack because it works only if
4432 // the demuxer thread is disabled.
demux_get_bytes_read_hack(struct demuxer * demuxer)4433 int64_t demux_get_bytes_read_hack(struct demuxer *demuxer)
4434 {
4435     struct demux_internal *in = demuxer->in;
4436 
4437     // Required because demuxer==in->d_user, and we access in->d_thread.
4438     // Locking won't solve this, because we also need to access struct stream.
4439     assert(!in->threading);
4440 
4441     update_bytes_read(in);
4442 
4443     int64_t res = in->hack_unbuffered_read_bytes;
4444     in->hack_unbuffered_read_bytes = 0;
4445     return res;
4446 }
4447 
demux_get_bitrate_stats(struct demuxer * demuxer,double * rates)4448 void demux_get_bitrate_stats(struct demuxer *demuxer, double *rates)
4449 {
4450     struct demux_internal *in = demuxer->in;
4451     assert(demuxer == in->d_user);
4452 
4453     pthread_mutex_lock(&in->lock);
4454 
4455     for (int n = 0; n < STREAM_TYPE_COUNT; n++)
4456         rates[n] = -1;
4457     for (int n = 0; n < in->num_streams; n++) {
4458         struct demux_stream *ds = in->streams[n]->ds;
4459         if (ds->selected && ds->bitrate >= 0)
4460             rates[ds->type] = MPMAX(0, rates[ds->type]) + ds->bitrate;
4461     }
4462 
4463     pthread_mutex_unlock(&in->lock);
4464 }
4465 
demux_get_reader_state(struct demuxer * demuxer,struct demux_reader_state * r)4466 void demux_get_reader_state(struct demuxer *demuxer, struct demux_reader_state *r)
4467 {
4468     struct demux_internal *in = demuxer->in;
4469     assert(demuxer == in->d_user);
4470 
4471     pthread_mutex_lock(&in->lock);
4472 
4473     *r = (struct demux_reader_state){
4474         .eof = in->eof,
4475         .ts_reader = MP_NOPTS_VALUE,
4476         .ts_end = MP_NOPTS_VALUE,
4477         .ts_duration = -1,
4478         .total_bytes = in->total_bytes,
4479         .seeking = in->seeking_in_progress,
4480         .low_level_seeks = in->low_level_seeks,
4481         .ts_last = in->demux_ts,
4482         .bytes_per_second = in->bytes_per_second,
4483         .byte_level_seeks = in->byte_level_seeks,
4484         .file_cache_bytes = in->cache ? demux_cache_get_size(in->cache) : -1,
4485     };
4486     bool any_packets = false;
4487     for (int n = 0; n < in->num_streams; n++) {
4488         struct demux_stream *ds = in->streams[n]->ds;
4489         if (ds->eager && !(!ds->queue->head && ds->eof) && !ds->ignore_eof) {
4490             r->underrun |= !ds->reader_head && !ds->eof && !ds->still_image;
4491             r->ts_reader = MP_PTS_MAX(r->ts_reader, ds->base_ts);
4492             r->ts_end = MP_PTS_MAX(r->ts_end, ds->queue->last_ts);
4493             any_packets |= !!ds->reader_head;
4494         }
4495         r->fw_bytes += get_foward_buffered_bytes(ds);
4496     }
4497     r->idle = (!in->reading && !r->underrun) || r->eof;
4498     r->underrun &= !r->idle && in->threading;
4499     r->ts_reader = MP_ADD_PTS(r->ts_reader, in->ts_offset);
4500     r->ts_end = MP_ADD_PTS(r->ts_end, in->ts_offset);
4501     if (r->ts_reader != MP_NOPTS_VALUE && r->ts_reader <= r->ts_end)
4502         r->ts_duration = r->ts_end - r->ts_reader;
4503     if (in->seeking || !any_packets)
4504         r->ts_duration = 0;
4505     for (int n = 0; n < MPMIN(in->num_ranges, MAX_SEEK_RANGES); n++) {
4506         struct demux_cached_range *range = in->ranges[n];
4507         if (range->seek_start != MP_NOPTS_VALUE) {
4508             r->seek_ranges[r->num_seek_ranges++] =
4509                 (struct demux_seek_range){
4510                     .start = MP_ADD_PTS(range->seek_start, in->ts_offset),
4511                     .end = MP_ADD_PTS(range->seek_end, in->ts_offset),
4512                 };
4513             r->bof_cached |= range->is_bof;
4514             r->eof_cached |= range->is_eof;
4515         }
4516     }
4517 
4518     pthread_mutex_unlock(&in->lock);
4519 }
4520 
demux_cancel_test(struct demuxer * demuxer)4521 bool demux_cancel_test(struct demuxer *demuxer)
4522 {
4523     return mp_cancel_test(demuxer->cancel);
4524 }
4525 
demux_copy_chapter_data(struct demux_chapter * c,int num)4526 struct demux_chapter *demux_copy_chapter_data(struct demux_chapter *c, int num)
4527 {
4528     struct demux_chapter *new = talloc_array(NULL, struct demux_chapter, num);
4529     for (int n = 0; n < num; n++) {
4530         new[n] = c[n];
4531         new[n].metadata = mp_tags_dup(new, new[n].metadata);
4532     }
4533     return new;
4534 }
4535 
visit_tags(void * ctx,void (* visit)(void * ctx,void * ta,char ** s),struct mp_tags * tags)4536 static void visit_tags(void *ctx, void (*visit)(void *ctx, void *ta, char **s),
4537                        struct mp_tags *tags)
4538 {
4539     for (int n = 0; n < (tags ? tags->num_keys : 0); n++)
4540         visit(ctx, tags, &tags->values[n]);
4541 }
4542 
visit_meta(struct demuxer * demuxer,void * ctx,void (* visit)(void * ctx,void * ta,char ** s))4543 static void visit_meta(struct demuxer *demuxer, void *ctx,
4544                        void (*visit)(void *ctx, void *ta, char **s))
4545 {
4546     struct demux_internal *in = demuxer->in;
4547 
4548     for (int n = 0; n < in->num_streams; n++) {
4549         struct sh_stream *sh = in->streams[n];
4550 
4551         visit(ctx, sh, &sh->title);
4552         visit_tags(ctx, visit, sh->tags);
4553     }
4554 
4555     for (int n = 0; n < demuxer->num_chapters; n++)
4556         visit_tags(ctx, visit, demuxer->chapters[n].metadata);
4557 
4558     visit_tags(ctx, visit, demuxer->metadata);
4559 }
4560 
4561 
visit_detect(void * ctx,void * ta,char ** s)4562 static void visit_detect(void *ctx, void *ta, char **s)
4563 {
4564     char **all = ctx;
4565 
4566     if (*s)
4567         *all = talloc_asprintf_append_buffer(*all, "%s\n", *s);
4568 }
4569 
visit_convert(void * ctx,void * ta,char ** s)4570 static void visit_convert(void *ctx, void *ta, char **s)
4571 {
4572     struct demuxer *demuxer = ctx;
4573     struct demux_internal *in = demuxer->in;
4574 
4575     if (!*s)
4576         return;
4577 
4578     bstr data = bstr0(*s);
4579     bstr conv = mp_iconv_to_utf8(in->log, data, in->meta_charset,
4580                                  MP_ICONV_VERBOSE);
4581     if (conv.start && conv.start != data.start) {
4582         char *ns = conv.start; // 0-termination is guaranteed
4583         // (The old string might not be an alloc, but if it is, it's a talloc
4584         // child, and will not leak, even if it stays allocated uselessly.)
4585         *s = ns;
4586         talloc_steal(ta, *s);
4587     }
4588 }
4589 
demux_convert_tags_charset(struct demuxer * demuxer)4590 static void demux_convert_tags_charset(struct demuxer *demuxer)
4591 {
4592     struct demux_internal *in = demuxer->in;
4593 
4594     char *cp = in->opts->meta_cp;
4595     if (!cp || mp_charset_is_utf8(cp))
4596         return;
4597 
4598     char *data = talloc_strdup(NULL, "");
4599     visit_meta(demuxer, &data, visit_detect);
4600 
4601     in->meta_charset = (char *)mp_charset_guess(in, in->log, bstr0(data), cp, 0);
4602     if (in->meta_charset && !mp_charset_is_utf8(in->meta_charset)) {
4603         MP_INFO(demuxer, "Using tag charset: %s\n", in->meta_charset);
4604         visit_meta(demuxer, demuxer, visit_convert);
4605     }
4606 
4607     talloc_free(data);
4608 }
4609 
get_demux_sub_opts(int index,const struct m_sub_options ** sub)4610 static bool get_demux_sub_opts(int index, const struct m_sub_options **sub)
4611 {
4612     if (!demuxer_list[index])
4613         return false;
4614     *sub = demuxer_list[index]->options;
4615     return true;
4616 }
4617