1 use crate::range_set::{Range, RangeSet};
2 use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
3 use bytes::Bytes;
4 use futures::sync::{mpsc, oneshot};
5 use futures::Stream;
6 use futures::{Async, Future, Poll};
7 use std::cmp::{max, min};
8 use std::fs;
9 use std::io::{self, Read, Seek, SeekFrom, Write};
10 use std::sync::{Arc, Condvar, Mutex};
11 use std::time::{Duration, Instant};
12 use tempfile::NamedTempFile;
13
14 use futures::sync::mpsc::unbounded;
15 use librespot_core::channel::{Channel, ChannelData, ChannelError, ChannelHeaders};
16 use librespot_core::session::Session;
17 use librespot_core::spotify_id::FileId;
18 use std::sync::atomic;
19 use std::sync::atomic::AtomicUsize;
20
21 const MINIMUM_DOWNLOAD_SIZE: usize = 1024 * 16;
22 // The minimum size of a block that is requested from the Spotify servers in one request.
23 // This is the block size that is typically requested while doing a seek() on a file.
24 // Note: smaller requests can happen if part of the block is downloaded already.
25
26 const INITIAL_DOWNLOAD_SIZE: usize = 1024 * 16;
27 // The amount of data that is requested when initially opening a file.
28 // Note: if the file is opened to play from the beginning, the amount of data to
29 // read ahead is requested in addition to this amount. If the file is opened to seek to
30 // another position, then only this amount is requested on the first request.
31
32 const INITIAL_PING_TIME_ESTIMATE_SECONDS: f64 = 0.5;
33 // The pig time that is used for calculations before a ping time was actually measured.
34
35 const MAXIMUM_ASSUMED_PING_TIME_SECONDS: f64 = 1.5;
36 // If the measured ping time to the Spotify server is larger than this value, it is capped
37 // to avoid run-away block sizes and pre-fetching.
38
39 pub const READ_AHEAD_BEFORE_PLAYBACK_SECONDS: f64 = 1.0;
40 // Before playback starts, this many seconds of data must be present.
41 // Note: the calculations are done using the nominal bitrate of the file. The actual amount
42 // of audio data may be larger or smaller.
43
44 pub const READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS: f64 = 2.0;
45 // Same as READ_AHEAD_BEFORE_PLAYBACK_SECONDS, but the time is taken as a factor of the ping
46 // time to the Spotify server.
47 // Both, READ_AHEAD_BEFORE_PLAYBACK_SECONDS and READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS are
48 // obeyed.
49 // Note: the calculations are done using the nominal bitrate of the file. The actual amount
50 // of audio data may be larger or smaller.
51
52 pub const READ_AHEAD_DURING_PLAYBACK_SECONDS: f64 = 5.0;
53 // While playing back, this many seconds of data ahead of the current read position are
54 // requested.
55 // Note: the calculations are done using the nominal bitrate of the file. The actual amount
56 // of audio data may be larger or smaller.
57
58 pub const READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS: f64 = 10.0;
59 // Same as READ_AHEAD_DURING_PLAYBACK_SECONDS, but the time is taken as a factor of the ping
60 // time to the Spotify server.
61 // Note: the calculations are done using the nominal bitrate of the file. The actual amount
62 // of audio data may be larger or smaller.
63
64 const PREFETCH_THRESHOLD_FACTOR: f64 = 4.0;
65 // If the amount of data that is pending (requested but not received) is less than a certain amount,
66 // data is pre-fetched in addition to the read ahead settings above. The threshold for requesting more
67 // data is calculated as
68 // <pending bytes> < PREFETCH_THRESHOLD_FACTOR * <ping time> * <nominal data rate>
69
70 const FAST_PREFETCH_THRESHOLD_FACTOR: f64 = 1.5;
71 // Similar to PREFETCH_THRESHOLD_FACTOR, but it also takes the current download rate into account.
72 // The formula used is
73 // <pending bytes> < FAST_PREFETCH_THRESHOLD_FACTOR * <ping time> * <measured download rate>
74 // This mechanism allows for fast downloading of the remainder of the file. The number should be larger
75 // than 1 so the download rate ramps up until the bandwidth is saturated. The larger the value, the faster
76 // the download rate ramps up. However, this comes at the cost that it might hurt ping-time if a seek is
77 // performed while downloading. Values smaller than 1 cause the download rate to collapse and effectively
78 // only PREFETCH_THRESHOLD_FACTOR is in effect. Thus, set to zero if bandwidth saturation is not wanted.
79
80 const MAX_PREFETCH_REQUESTS: usize = 4;
81 // Limit the number of requests that are pending simultaneously before pre-fetching data. Pending
82 // requests share bandwidth. Thus, havint too many requests can lead to the one that is needed next
83 // for playback to be delayed leading to a buffer underrun. This limit has the effect that a new
84 // pre-fetch request is only sent if less than MAX_PREFETCH_REQUESTS are pending.
85
86 pub enum AudioFile {
87 Cached(fs::File),
88 Streaming(AudioFileStreaming),
89 }
90
91 pub enum AudioFileOpen {
92 Cached(Option<fs::File>),
93 Streaming(AudioFileOpenStreaming),
94 }
95
96 pub struct AudioFileOpenStreaming {
97 session: Session,
98 initial_data_rx: Option<ChannelData>,
99 initial_data_length: Option<usize>,
100 initial_request_sent_time: Instant,
101 headers: ChannelHeaders,
102 file_id: FileId,
103 complete_tx: Option<oneshot::Sender<NamedTempFile>>,
104 streaming_data_rate: usize,
105 }
106
107 enum StreamLoaderCommand {
108 Fetch(Range), // signal the stream loader to fetch a range of the file
109 RandomAccessMode(), // optimise download strategy for random access
110 StreamMode(), // optimise download strategy for streaming
111 Close(), // terminate and don't load any more data
112 }
113
114 #[derive(Clone)]
115 pub struct StreamLoaderController {
116 channel_tx: Option<mpsc::UnboundedSender<StreamLoaderCommand>>,
117 stream_shared: Option<Arc<AudioFileShared>>,
118 file_size: usize,
119 }
120
121 impl StreamLoaderController {
len(&self) -> usize122 pub fn len(&self) -> usize {
123 return self.file_size;
124 }
125
range_available(&self, range: Range) -> bool126 pub fn range_available(&self, range: Range) -> bool {
127 if let Some(ref shared) = self.stream_shared {
128 let download_status = shared.download_status.lock().unwrap();
129 if range.length
130 <= download_status
131 .downloaded
132 .contained_length_from_value(range.start)
133 {
134 return true;
135 } else {
136 return false;
137 }
138 } else {
139 if range.length <= self.len() - range.start {
140 return true;
141 } else {
142 return false;
143 }
144 }
145 }
146
range_to_end_available(&self) -> bool147 pub fn range_to_end_available(&self) -> bool {
148 if let Some(ref shared) = self.stream_shared {
149 let read_position = shared.read_position.load(atomic::Ordering::Relaxed);
150 self.range_available(Range::new(read_position, self.len() - read_position))
151 } else {
152 true
153 }
154 }
155
ping_time_ms(&self) -> usize156 pub fn ping_time_ms(&self) -> usize {
157 if let Some(ref shared) = self.stream_shared {
158 return shared.ping_time_ms.load(atomic::Ordering::Relaxed);
159 } else {
160 return 0;
161 }
162 }
163
send_stream_loader_command(&mut self, command: StreamLoaderCommand)164 fn send_stream_loader_command(&mut self, command: StreamLoaderCommand) {
165 if let Some(ref mut channel) = self.channel_tx {
166 // ignore the error in case the channel has been closed already.
167 let _ = channel.unbounded_send(command);
168 }
169 }
170
fetch(&mut self, range: Range)171 pub fn fetch(&mut self, range: Range) {
172 // signal the stream loader to fetch a range of the file
173 self.send_stream_loader_command(StreamLoaderCommand::Fetch(range));
174 }
175
fetch_blocking(&mut self, mut range: Range)176 pub fn fetch_blocking(&mut self, mut range: Range) {
177 // signal the stream loader to tech a range of the file and block until it is loaded.
178
179 // ensure the range is within the file's bounds.
180 if range.start >= self.len() {
181 range.length = 0;
182 } else if range.end() > self.len() {
183 range.length = self.len() - range.start;
184 }
185
186 self.fetch(range);
187
188 if let Some(ref shared) = self.stream_shared {
189 let mut download_status = shared.download_status.lock().unwrap();
190 while range.length
191 > download_status
192 .downloaded
193 .contained_length_from_value(range.start)
194 {
195 download_status = shared
196 .cond
197 .wait_timeout(download_status, Duration::from_millis(1000))
198 .unwrap()
199 .0;
200 if range.length
201 > (download_status
202 .downloaded
203 .union(&download_status.requested)
204 .contained_length_from_value(range.start))
205 {
206 // For some reason, the requested range is neither downloaded nor requested.
207 // This could be due to a network error. Request it again.
208 // We can't use self.fetch here because self can't be borrowed mutably, so we access the channel directly.
209 if let Some(ref mut channel) = self.channel_tx {
210 // ignore the error in case the channel has been closed already.
211 let _ = channel.unbounded_send(StreamLoaderCommand::Fetch(range));
212 }
213 }
214 }
215 }
216 }
217
fetch_next(&mut self, length: usize)218 pub fn fetch_next(&mut self, length: usize) {
219 let range: Range = if let Some(ref shared) = self.stream_shared {
220 Range {
221 start: shared.read_position.load(atomic::Ordering::Relaxed),
222 length: length,
223 }
224 } else {
225 return;
226 };
227 self.fetch(range);
228 }
229
fetch_next_blocking(&mut self, length: usize)230 pub fn fetch_next_blocking(&mut self, length: usize) {
231 let range: Range = if let Some(ref shared) = self.stream_shared {
232 Range {
233 start: shared.read_position.load(atomic::Ordering::Relaxed),
234 length: length,
235 }
236 } else {
237 return;
238 };
239 self.fetch_blocking(range);
240 }
241
set_random_access_mode(&mut self)242 pub fn set_random_access_mode(&mut self) {
243 // optimise download strategy for random access
244 self.send_stream_loader_command(StreamLoaderCommand::RandomAccessMode());
245 }
246
set_stream_mode(&mut self)247 pub fn set_stream_mode(&mut self) {
248 // optimise download strategy for streaming
249 self.send_stream_loader_command(StreamLoaderCommand::StreamMode());
250 }
251
close(&mut self)252 pub fn close(&mut self) {
253 // terminate stream loading and don't load any more data for this file.
254 self.send_stream_loader_command(StreamLoaderCommand::Close());
255 }
256 }
257
258 pub struct AudioFileStreaming {
259 read_file: fs::File,
260
261 position: u64,
262
263 stream_loader_command_tx: mpsc::UnboundedSender<StreamLoaderCommand>,
264
265 shared: Arc<AudioFileShared>,
266 }
267
268 struct AudioFileDownloadStatus {
269 requested: RangeSet,
270 downloaded: RangeSet,
271 }
272
273 #[derive(Copy, Clone)]
274 enum DownloadStrategy {
275 RandomAccess(),
276 Streaming(),
277 }
278
279 struct AudioFileShared {
280 file_id: FileId,
281 file_size: usize,
282 stream_data_rate: usize,
283 cond: Condvar,
284 download_status: Mutex<AudioFileDownloadStatus>,
285 download_strategy: Mutex<DownloadStrategy>,
286 number_of_open_requests: AtomicUsize,
287 ping_time_ms: AtomicUsize,
288 read_position: AtomicUsize,
289 }
290
291 impl AudioFileOpenStreaming {
finish(&mut self, size: usize) -> AudioFileStreaming292 fn finish(&mut self, size: usize) -> AudioFileStreaming {
293 let shared = Arc::new(AudioFileShared {
294 file_id: self.file_id,
295 file_size: size,
296 stream_data_rate: self.streaming_data_rate,
297 cond: Condvar::new(),
298 download_status: Mutex::new(AudioFileDownloadStatus {
299 requested: RangeSet::new(),
300 downloaded: RangeSet::new(),
301 }),
302 download_strategy: Mutex::new(DownloadStrategy::RandomAccess()), // start with random access mode until someone tells us otherwise
303 number_of_open_requests: AtomicUsize::new(0),
304 ping_time_ms: AtomicUsize::new(0),
305 read_position: AtomicUsize::new(0),
306 });
307
308 let mut write_file = NamedTempFile::new().unwrap();
309 write_file.as_file().set_len(size as u64).unwrap();
310 write_file.seek(SeekFrom::Start(0)).unwrap();
311
312 let read_file = write_file.reopen().unwrap();
313
314 let initial_data_rx = self.initial_data_rx.take().unwrap();
315 let initial_data_length = self.initial_data_length.take().unwrap();
316 let complete_tx = self.complete_tx.take().unwrap();
317 //let (seek_tx, seek_rx) = mpsc::unbounded();
318 let (stream_loader_command_tx, stream_loader_command_rx) =
319 mpsc::unbounded::<StreamLoaderCommand>();
320
321 let fetcher = AudioFileFetch::new(
322 self.session.clone(),
323 shared.clone(),
324 initial_data_rx,
325 self.initial_request_sent_time,
326 initial_data_length,
327 write_file,
328 stream_loader_command_rx,
329 complete_tx,
330 );
331 self.session.spawn(move |_| fetcher);
332
333 AudioFileStreaming {
334 read_file: read_file,
335
336 position: 0,
337 //seek: seek_tx,
338 stream_loader_command_tx: stream_loader_command_tx,
339
340 shared: shared,
341 }
342 }
343 }
344
345 impl Future for AudioFileOpen {
346 type Item = AudioFile;
347 type Error = ChannelError;
348
poll(&mut self) -> Poll<AudioFile, ChannelError>349 fn poll(&mut self) -> Poll<AudioFile, ChannelError> {
350 match *self {
351 AudioFileOpen::Streaming(ref mut open) => {
352 let file = try_ready!(open.poll());
353 Ok(Async::Ready(AudioFile::Streaming(file)))
354 }
355 AudioFileOpen::Cached(ref mut file) => {
356 let file = file.take().unwrap();
357 Ok(Async::Ready(AudioFile::Cached(file)))
358 }
359 }
360 }
361 }
362
363 impl Future for AudioFileOpenStreaming {
364 type Item = AudioFileStreaming;
365 type Error = ChannelError;
366
poll(&mut self) -> Poll<AudioFileStreaming, ChannelError>367 fn poll(&mut self) -> Poll<AudioFileStreaming, ChannelError> {
368 loop {
369 let (id, data) = try_ready!(self.headers.poll()).unwrap();
370
371 if id == 0x3 {
372 let size = BigEndian::read_u32(&data) as usize * 4;
373 let file = self.finish(size);
374
375 return Ok(Async::Ready(file));
376 }
377 }
378 }
379 }
380
381 impl AudioFile {
open( session: &Session, file_id: FileId, bytes_per_second: usize, play_from_beginning: bool, ) -> AudioFileOpen382 pub fn open(
383 session: &Session,
384 file_id: FileId,
385 bytes_per_second: usize,
386 play_from_beginning: bool,
387 ) -> AudioFileOpen {
388 let cache = session.cache().cloned();
389
390 if let Some(file) = cache.as_ref().and_then(|cache| cache.file(file_id)) {
391 debug!("File {} already in cache", file_id);
392 return AudioFileOpen::Cached(Some(file));
393 }
394
395 debug!("Downloading file {}", file_id);
396
397 let (complete_tx, complete_rx) = oneshot::channel();
398 let mut initial_data_length = if play_from_beginning {
399 INITIAL_DOWNLOAD_SIZE
400 + max(
401 (READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize,
402 (INITIAL_PING_TIME_ESTIMATE_SECONDS
403 * READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS
404 * bytes_per_second as f64) as usize,
405 )
406 } else {
407 INITIAL_DOWNLOAD_SIZE
408 };
409 if initial_data_length % 4 != 0 {
410 initial_data_length += 4 - (initial_data_length % 4);
411 }
412 let (headers, data) = request_range(session, file_id, 0, initial_data_length).split();
413
414 let open = AudioFileOpenStreaming {
415 session: session.clone(),
416 file_id: file_id,
417
418 headers: headers,
419 initial_data_rx: Some(data),
420 initial_data_length: Some(initial_data_length),
421 initial_request_sent_time: Instant::now(),
422
423 complete_tx: Some(complete_tx),
424 streaming_data_rate: bytes_per_second,
425 };
426
427 let session_ = session.clone();
428 session.spawn(move |_| {
429 complete_rx
430 .map(move |mut file| {
431 if let Some(cache) = session_.cache() {
432 cache.save_file(file_id, &mut file);
433 debug!("File {} complete, saving to cache", file_id);
434 } else {
435 debug!("File {} complete", file_id);
436 }
437 })
438 .or_else(|oneshot::Canceled| Ok(()))
439 });
440
441 return AudioFileOpen::Streaming(open);
442 }
443
get_stream_loader_controller(&self) -> StreamLoaderController444 pub fn get_stream_loader_controller(&self) -> StreamLoaderController {
445 match self {
446 AudioFile::Streaming(ref stream) => {
447 return StreamLoaderController {
448 channel_tx: Some(stream.stream_loader_command_tx.clone()),
449 stream_shared: Some(stream.shared.clone()),
450 file_size: stream.shared.file_size,
451 };
452 }
453 AudioFile::Cached(ref file) => {
454 return StreamLoaderController {
455 channel_tx: None,
456 stream_shared: None,
457 file_size: file.metadata().unwrap().len() as usize,
458 };
459 }
460 }
461 }
462 }
463
request_range(session: &Session, file: FileId, offset: usize, length: usize) -> Channel464 fn request_range(session: &Session, file: FileId, offset: usize, length: usize) -> Channel {
465 assert!(
466 offset % 4 == 0,
467 "Range request start positions must be aligned by 4 bytes."
468 );
469 assert!(
470 length % 4 == 0,
471 "Range request range lengths must be aligned by 4 bytes."
472 );
473 let start = offset / 4;
474 let end = (offset + length) / 4;
475
476 let (id, channel) = session.channel().allocate();
477
478 let mut data: Vec<u8> = Vec::new();
479 data.write_u16::<BigEndian>(id).unwrap();
480 data.write_u8(0).unwrap();
481 data.write_u8(1).unwrap();
482 data.write_u16::<BigEndian>(0x0000).unwrap();
483 data.write_u32::<BigEndian>(0x00000000).unwrap();
484 data.write_u32::<BigEndian>(0x00009C40).unwrap();
485 data.write_u32::<BigEndian>(0x00020000).unwrap();
486 data.write(&file.0).unwrap();
487 data.write_u32::<BigEndian>(start as u32).unwrap();
488 data.write_u32::<BigEndian>(end as u32).unwrap();
489
490 session.send_packet(0x8, data);
491
492 channel
493 }
494
495 struct PartialFileData {
496 offset: usize,
497 data: Bytes,
498 }
499
500 enum ReceivedData {
501 ResponseTimeMs(usize),
502 Data(PartialFileData),
503 }
504
505 struct AudioFileFetchDataReceiver {
506 shared: Arc<AudioFileShared>,
507 file_data_tx: mpsc::UnboundedSender<ReceivedData>,
508 data_rx: ChannelData,
509 initial_data_offset: usize,
510 initial_request_length: usize,
511 data_offset: usize,
512 request_length: usize,
513 request_sent_time: Option<Instant>,
514 measure_ping_time: bool,
515 }
516
517 impl AudioFileFetchDataReceiver {
new( shared: Arc<AudioFileShared>, file_data_tx: mpsc::UnboundedSender<ReceivedData>, data_rx: ChannelData, data_offset: usize, request_length: usize, request_sent_time: Instant, ) -> AudioFileFetchDataReceiver518 fn new(
519 shared: Arc<AudioFileShared>,
520 file_data_tx: mpsc::UnboundedSender<ReceivedData>,
521 data_rx: ChannelData,
522 data_offset: usize,
523 request_length: usize,
524 request_sent_time: Instant,
525 ) -> AudioFileFetchDataReceiver {
526 let measure_ping_time = shared
527 .number_of_open_requests
528 .load(atomic::Ordering::SeqCst)
529 == 0;
530
531 shared
532 .number_of_open_requests
533 .fetch_add(1, atomic::Ordering::SeqCst);
534
535 AudioFileFetchDataReceiver {
536 shared: shared,
537 data_rx: data_rx,
538 file_data_tx: file_data_tx,
539 initial_data_offset: data_offset,
540 initial_request_length: request_length,
541 data_offset: data_offset,
542 request_length: request_length,
543 request_sent_time: Some(request_sent_time),
544 measure_ping_time: measure_ping_time,
545 }
546 }
547 }
548
549 impl AudioFileFetchDataReceiver {
finish(&mut self)550 fn finish(&mut self) {
551 if self.request_length > 0 {
552 let missing_range = Range::new(self.data_offset, self.request_length);
553
554 let mut download_status = self.shared.download_status.lock().unwrap();
555 download_status.requested.subtract_range(&missing_range);
556 self.shared.cond.notify_all();
557 }
558
559 self.shared
560 .number_of_open_requests
561 .fetch_sub(1, atomic::Ordering::SeqCst);
562 }
563 }
564
565 impl Future for AudioFileFetchDataReceiver {
566 type Item = ();
567 type Error = ();
568
poll(&mut self) -> Poll<(), ()>569 fn poll(&mut self) -> Poll<(), ()> {
570 loop {
571 match self.data_rx.poll() {
572 Ok(Async::Ready(Some(data))) => {
573 if self.measure_ping_time {
574 if let Some(request_sent_time) = self.request_sent_time {
575 let duration = Instant::now() - request_sent_time;
576 let duration_ms: u64;
577 if 0.001 * (duration.as_millis() as f64)
578 > MAXIMUM_ASSUMED_PING_TIME_SECONDS
579 {
580 duration_ms = (MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000.0) as u64;
581 } else {
582 duration_ms = duration.as_millis() as u64;
583 }
584 let _ = self
585 .file_data_tx
586 .unbounded_send(ReceivedData::ResponseTimeMs(duration_ms as usize));
587 self.measure_ping_time = false;
588 }
589 }
590 let data_size = data.len();
591 let _ = self
592 .file_data_tx
593 .unbounded_send(ReceivedData::Data(PartialFileData {
594 offset: self.data_offset,
595 data: data,
596 }));
597 self.data_offset += data_size;
598 if self.request_length < data_size {
599 warn!("Data receiver for range {} (+{}) received more data from server than requested.", self.initial_data_offset, self.initial_request_length);
600 self.request_length = 0;
601 } else {
602 self.request_length -= data_size;
603 }
604 if self.request_length == 0 {
605 self.finish();
606 return Ok(Async::Ready(()));
607 }
608 }
609 Ok(Async::Ready(None)) => {
610 if self.request_length > 0 {
611 warn!("Data receiver for range {} (+{}) received less data from server than requested.", self.initial_data_offset, self.initial_request_length);
612 }
613 self.finish();
614 return Ok(Async::Ready(()));
615 }
616 Ok(Async::NotReady) => {
617 return Ok(Async::NotReady);
618 }
619 Err(ChannelError) => {
620 warn!(
621 "Error from channel for data receiver for range {} (+{}).",
622 self.initial_data_offset, self.initial_request_length
623 );
624 self.finish();
625 return Ok(Async::Ready(()));
626 }
627 }
628 }
629 }
630 }
631
632 struct AudioFileFetch {
633 session: Session,
634 shared: Arc<AudioFileShared>,
635 output: Option<NamedTempFile>,
636
637 file_data_tx: mpsc::UnboundedSender<ReceivedData>,
638 file_data_rx: mpsc::UnboundedReceiver<ReceivedData>,
639
640 stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>,
641 complete_tx: Option<oneshot::Sender<NamedTempFile>>,
642 network_response_times_ms: Vec<usize>,
643 }
644
645 impl AudioFileFetch {
new( session: Session, shared: Arc<AudioFileShared>, initial_data_rx: ChannelData, initial_request_sent_time: Instant, initial_data_length: usize, output: NamedTempFile, stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>, complete_tx: oneshot::Sender<NamedTempFile>, ) -> AudioFileFetch646 fn new(
647 session: Session,
648 shared: Arc<AudioFileShared>,
649 initial_data_rx: ChannelData,
650 initial_request_sent_time: Instant,
651 initial_data_length: usize,
652
653 output: NamedTempFile,
654 stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>,
655 complete_tx: oneshot::Sender<NamedTempFile>,
656 ) -> AudioFileFetch {
657 let (file_data_tx, file_data_rx) = unbounded::<ReceivedData>();
658
659 {
660 let requested_range = Range::new(0, initial_data_length);
661 let mut download_status = shared.download_status.lock().unwrap();
662 download_status.requested.add_range(&requested_range);
663 }
664
665 let initial_data_receiver = AudioFileFetchDataReceiver::new(
666 shared.clone(),
667 file_data_tx.clone(),
668 initial_data_rx,
669 0,
670 initial_data_length,
671 initial_request_sent_time,
672 );
673
674 session.spawn(move |_| initial_data_receiver);
675
676 AudioFileFetch {
677 session: session,
678 shared: shared,
679 output: Some(output),
680
681 file_data_tx: file_data_tx,
682 file_data_rx: file_data_rx,
683
684 stream_loader_command_rx: stream_loader_command_rx,
685 complete_tx: Some(complete_tx),
686 network_response_times_ms: Vec::new(),
687 }
688 }
689
get_download_strategy(&mut self) -> DownloadStrategy690 fn get_download_strategy(&mut self) -> DownloadStrategy {
691 *(self.shared.download_strategy.lock().unwrap())
692 }
693
download_range(&mut self, mut offset: usize, mut length: usize)694 fn download_range(&mut self, mut offset: usize, mut length: usize) {
695 if length < MINIMUM_DOWNLOAD_SIZE {
696 length = MINIMUM_DOWNLOAD_SIZE;
697 }
698
699 // ensure the values are within the bounds and align them by 4 for the spotify protocol.
700 if offset >= self.shared.file_size {
701 return;
702 }
703
704 if length <= 0 {
705 return;
706 }
707
708 if offset + length > self.shared.file_size {
709 length = self.shared.file_size - offset;
710 }
711
712 if offset % 4 != 0 {
713 length += offset % 4;
714 offset -= offset % 4;
715 }
716
717 if length % 4 != 0 {
718 length += 4 - (length % 4);
719 }
720
721 let mut ranges_to_request = RangeSet::new();
722 ranges_to_request.add_range(&Range::new(offset, length));
723
724 let mut download_status = self.shared.download_status.lock().unwrap();
725
726 ranges_to_request.subtract_range_set(&download_status.downloaded);
727 ranges_to_request.subtract_range_set(&download_status.requested);
728
729 for range in ranges_to_request.iter() {
730 let (_headers, data) = request_range(
731 &self.session,
732 self.shared.file_id,
733 range.start,
734 range.length,
735 )
736 .split();
737
738 download_status.requested.add_range(range);
739
740 let receiver = AudioFileFetchDataReceiver::new(
741 self.shared.clone(),
742 self.file_data_tx.clone(),
743 data,
744 range.start,
745 range.length,
746 Instant::now(),
747 );
748
749 self.session.spawn(move |_| receiver);
750 }
751 }
752
pre_fetch_more_data(&mut self, bytes: usize, max_requests_to_send: usize)753 fn pre_fetch_more_data(&mut self, bytes: usize, max_requests_to_send: usize) {
754 let mut bytes_to_go = bytes;
755 let mut requests_to_go = max_requests_to_send;
756
757 while bytes_to_go > 0 && requests_to_go > 0 {
758 // determine what is still missing
759 let mut missing_data = RangeSet::new();
760 missing_data.add_range(&Range::new(0, self.shared.file_size));
761 {
762 let download_status = self.shared.download_status.lock().unwrap();
763 missing_data.subtract_range_set(&download_status.downloaded);
764 missing_data.subtract_range_set(&download_status.requested);
765 }
766
767 // download data from after the current read position first
768 let mut tail_end = RangeSet::new();
769 let read_position = self.shared.read_position.load(atomic::Ordering::Relaxed);
770 tail_end.add_range(&Range::new(
771 read_position,
772 self.shared.file_size - read_position,
773 ));
774 let tail_end = tail_end.intersection(&missing_data);
775
776 if !tail_end.is_empty() {
777 let range = tail_end.get_range(0);
778 let offset = range.start;
779 let length = min(range.length, bytes_to_go);
780 self.download_range(offset, length);
781 requests_to_go -= 1;
782 bytes_to_go -= length;
783 } else if !missing_data.is_empty() {
784 // ok, the tail is downloaded, download something fom the beginning.
785 let range = missing_data.get_range(0);
786 let offset = range.start;
787 let length = min(range.length, bytes_to_go);
788 self.download_range(offset, length);
789 requests_to_go -= 1;
790 bytes_to_go -= length;
791 } else {
792 return;
793 }
794 }
795 }
796
poll_file_data_rx(&mut self) -> Poll<(), ()>797 fn poll_file_data_rx(&mut self) -> Poll<(), ()> {
798 loop {
799 match self.file_data_rx.poll() {
800 Ok(Async::Ready(None)) => {
801 return Ok(Async::Ready(()));
802 }
803 Ok(Async::Ready(Some(ReceivedData::ResponseTimeMs(response_time_ms)))) => {
804 trace!("Ping time estimated as: {} ms.", response_time_ms);
805
806 // record the response time
807 self.network_response_times_ms.push(response_time_ms);
808
809 // prune old response times. Keep at most three.
810 while self.network_response_times_ms.len() > 3 {
811 self.network_response_times_ms.remove(0);
812 }
813
814 // stats::median is experimental. So we calculate the median of up to three ourselves.
815 let ping_time_ms: usize = match self.network_response_times_ms.len() {
816 1 => self.network_response_times_ms[0] as usize,
817 2 => {
818 ((self.network_response_times_ms[0]
819 + self.network_response_times_ms[1])
820 / 2) as usize
821 }
822 3 => {
823 let mut times = self.network_response_times_ms.clone();
824 times.sort();
825 times[1]
826 }
827 _ => unreachable!(),
828 };
829
830 // store our new estimate for everyone to see
831 self.shared
832 .ping_time_ms
833 .store(ping_time_ms, atomic::Ordering::Relaxed);
834 }
835 Ok(Async::Ready(Some(ReceivedData::Data(data)))) => {
836 self.output
837 .as_mut()
838 .unwrap()
839 .seek(SeekFrom::Start(data.offset as u64))
840 .unwrap();
841 self.output
842 .as_mut()
843 .unwrap()
844 .write_all(data.data.as_ref())
845 .unwrap();
846
847 let mut full = false;
848
849 {
850 let mut download_status = self.shared.download_status.lock().unwrap();
851
852 let received_range = Range::new(data.offset, data.data.len());
853 download_status.downloaded.add_range(&received_range);
854 self.shared.cond.notify_all();
855
856 if download_status.downloaded.contained_length_from_value(0)
857 >= self.shared.file_size
858 {
859 full = true;
860 }
861
862 drop(download_status);
863 }
864
865 if full {
866 self.finish();
867 return Ok(Async::Ready(()));
868 }
869 }
870 Ok(Async::NotReady) => {
871 return Ok(Async::NotReady);
872 }
873 Err(()) => unreachable!(),
874 }
875 }
876 }
877
poll_stream_loader_command_rx(&mut self) -> Poll<(), ()>878 fn poll_stream_loader_command_rx(&mut self) -> Poll<(), ()> {
879 loop {
880 match self.stream_loader_command_rx.poll() {
881 Ok(Async::Ready(None)) => {
882 return Ok(Async::Ready(()));
883 }
884 Ok(Async::Ready(Some(StreamLoaderCommand::Fetch(request)))) => {
885 self.download_range(request.start, request.length);
886 }
887 Ok(Async::Ready(Some(StreamLoaderCommand::RandomAccessMode()))) => {
888 *(self.shared.download_strategy.lock().unwrap()) =
889 DownloadStrategy::RandomAccess();
890 }
891 Ok(Async::Ready(Some(StreamLoaderCommand::StreamMode()))) => {
892 *(self.shared.download_strategy.lock().unwrap()) =
893 DownloadStrategy::Streaming();
894 }
895 Ok(Async::Ready(Some(StreamLoaderCommand::Close()))) => {
896 return Ok(Async::Ready(()));
897 }
898 Ok(Async::NotReady) => return Ok(Async::NotReady),
899 Err(()) => unreachable!(),
900 }
901 }
902 }
903
finish(&mut self)904 fn finish(&mut self) {
905 let mut output = self.output.take().unwrap();
906 let complete_tx = self.complete_tx.take().unwrap();
907
908 output.seek(SeekFrom::Start(0)).unwrap();
909 let _ = complete_tx.send(output);
910 }
911 }
912
913 impl Future for AudioFileFetch {
914 type Item = ();
915 type Error = ();
916
poll(&mut self) -> Poll<(), ()>917 fn poll(&mut self) -> Poll<(), ()> {
918 match self.poll_stream_loader_command_rx() {
919 Ok(Async::NotReady) => (),
920 Ok(Async::Ready(_)) => {
921 return Ok(Async::Ready(()));
922 }
923 Err(()) => unreachable!(),
924 }
925
926 match self.poll_file_data_rx() {
927 Ok(Async::NotReady) => (),
928 Ok(Async::Ready(_)) => {
929 return Ok(Async::Ready(()));
930 }
931 Err(()) => unreachable!(),
932 }
933
934 if let DownloadStrategy::Streaming() = self.get_download_strategy() {
935 let number_of_open_requests = self
936 .shared
937 .number_of_open_requests
938 .load(atomic::Ordering::SeqCst);
939 let max_requests_to_send =
940 MAX_PREFETCH_REQUESTS - min(MAX_PREFETCH_REQUESTS, number_of_open_requests);
941
942 if max_requests_to_send > 0 {
943 let bytes_pending: usize = {
944 let download_status = self.shared.download_status.lock().unwrap();
945 download_status
946 .requested
947 .minus(&download_status.downloaded)
948 .len()
949 };
950
951 let ping_time_seconds =
952 0.001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64;
953 let download_rate = self.session.channel().get_download_rate_estimate();
954
955 let desired_pending_bytes = max(
956 (PREFETCH_THRESHOLD_FACTOR
957 * ping_time_seconds
958 * self.shared.stream_data_rate as f64) as usize,
959 (FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64)
960 as usize,
961 );
962
963 if bytes_pending < desired_pending_bytes {
964 self.pre_fetch_more_data(
965 desired_pending_bytes - bytes_pending,
966 max_requests_to_send,
967 );
968 }
969 }
970 }
971
972 return Ok(Async::NotReady);
973 }
974 }
975
976 impl Read for AudioFileStreaming {
read(&mut self, output: &mut [u8]) -> io::Result<usize>977 fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
978 let offset = self.position as usize;
979
980 if offset >= self.shared.file_size {
981 return Ok(0);
982 }
983
984 let length = min(output.len(), self.shared.file_size - offset);
985
986 let length_to_request = match *(self.shared.download_strategy.lock().unwrap()) {
987 DownloadStrategy::RandomAccess() => length,
988 DownloadStrategy::Streaming() => {
989 // Due to the read-ahead stuff, we potentially request more than the actual reqeust demanded.
990 let ping_time_seconds =
991 0.0001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64;
992
993 let length_to_request = length
994 + max(
995 (READ_AHEAD_DURING_PLAYBACK_SECONDS * self.shared.stream_data_rate as f64)
996 as usize,
997 (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS
998 * ping_time_seconds
999 * self.shared.stream_data_rate as f64) as usize,
1000 );
1001 min(length_to_request, self.shared.file_size - offset)
1002 }
1003 };
1004
1005 let mut ranges_to_request = RangeSet::new();
1006 ranges_to_request.add_range(&Range::new(offset, length_to_request));
1007
1008 let mut download_status = self.shared.download_status.lock().unwrap();
1009 ranges_to_request.subtract_range_set(&download_status.downloaded);
1010 ranges_to_request.subtract_range_set(&download_status.requested);
1011
1012 for range in ranges_to_request.iter() {
1013 self.stream_loader_command_tx
1014 .unbounded_send(StreamLoaderCommand::Fetch(range.clone()))
1015 .unwrap();
1016 }
1017
1018 if length == 0 {
1019 return Ok(0);
1020 }
1021
1022 let mut download_message_printed = false;
1023 while !download_status.downloaded.contains(offset) {
1024 if let DownloadStrategy::Streaming() = *self.shared.download_strategy.lock().unwrap() {
1025 if !download_message_printed {
1026 debug!("Stream waiting for download of file position {}. Downloaded ranges: {}. Pending ranges: {}", offset, download_status.downloaded, download_status.requested.minus(&download_status.downloaded));
1027 download_message_printed = true;
1028 }
1029 }
1030 download_status = self
1031 .shared
1032 .cond
1033 .wait_timeout(download_status, Duration::from_millis(1000))
1034 .unwrap()
1035 .0;
1036 }
1037 let available_length = download_status
1038 .downloaded
1039 .contained_length_from_value(offset);
1040 assert!(available_length > 0);
1041 drop(download_status);
1042
1043 self.position = self.read_file.seek(SeekFrom::Start(offset as u64)).unwrap();
1044 let read_len = min(length, available_length);
1045 let read_len = self.read_file.read(&mut output[..read_len])?;
1046
1047 if download_message_printed {
1048 debug!(
1049 "Read at postion {} completed. {} bytes returned, {} bytes were requested.",
1050 offset,
1051 read_len,
1052 output.len()
1053 );
1054 }
1055
1056 self.position += read_len as u64;
1057 self.shared
1058 .read_position
1059 .store(self.position as usize, atomic::Ordering::Relaxed);
1060
1061 return Ok(read_len);
1062 }
1063 }
1064
1065 impl Seek for AudioFileStreaming {
seek(&mut self, pos: SeekFrom) -> io::Result<u64>1066 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
1067 self.position = self.read_file.seek(pos)?;
1068 // Do not seek past EOF
1069 self.shared
1070 .read_position
1071 .store(self.position as usize, atomic::Ordering::Relaxed);
1072 Ok(self.position)
1073 }
1074 }
1075
1076 impl Read for AudioFile {
read(&mut self, output: &mut [u8]) -> io::Result<usize>1077 fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
1078 match *self {
1079 AudioFile::Cached(ref mut file) => file.read(output),
1080 AudioFile::Streaming(ref mut file) => file.read(output),
1081 }
1082 }
1083 }
1084
1085 impl Seek for AudioFile {
seek(&mut self, pos: SeekFrom) -> io::Result<u64>1086 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
1087 match *self {
1088 AudioFile::Cached(ref mut file) => file.seek(pos),
1089 AudioFile::Streaming(ref mut file) => file.seek(pos),
1090 }
1091 }
1092 }
1093