1 use crate::range_set::{Range, RangeSet};
2 use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
3 use bytes::Bytes;
4 use futures::sync::{mpsc, oneshot};
5 use futures::Stream;
6 use futures::{Async, Future, Poll};
7 use std::cmp::{max, min};
8 use std::fs;
9 use std::io::{self, Read, Seek, SeekFrom, Write};
10 use std::sync::{Arc, Condvar, Mutex};
11 use std::time::{Duration, Instant};
12 use tempfile::NamedTempFile;
13 
14 use futures::sync::mpsc::unbounded;
15 use librespot_core::channel::{Channel, ChannelData, ChannelError, ChannelHeaders};
16 use librespot_core::session::Session;
17 use librespot_core::spotify_id::FileId;
18 use std::sync::atomic;
19 use std::sync::atomic::AtomicUsize;
20 
21 const MINIMUM_DOWNLOAD_SIZE: usize = 1024 * 16;
22 // The minimum size of a block that is requested from the Spotify servers in one request.
23 // This is the block size that is typically requested while doing a seek() on a file.
24 // Note: smaller requests can happen if part of the block is downloaded already.
25 
26 const INITIAL_DOWNLOAD_SIZE: usize = 1024 * 16;
27 // The amount of data that is requested when initially opening a file.
28 // Note: if the file is opened to play from the beginning, the amount of data to
29 // read ahead is requested in addition to this amount. If the file is opened to seek to
30 // another position, then only this amount is requested on the first request.
31 
32 const INITIAL_PING_TIME_ESTIMATE_SECONDS: f64 = 0.5;
33 // The pig time that is used for calculations before a ping time was actually measured.
34 
35 const MAXIMUM_ASSUMED_PING_TIME_SECONDS: f64 = 1.5;
36 // If the measured ping time to the Spotify server is larger than this value, it is capped
37 // to avoid run-away block sizes and pre-fetching.
38 
39 pub const READ_AHEAD_BEFORE_PLAYBACK_SECONDS: f64 = 1.0;
40 // Before playback starts, this many seconds of data must be present.
41 // Note: the calculations are done using the nominal bitrate of the file. The actual amount
42 // of audio data may be larger or smaller.
43 
44 pub const READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS: f64 = 2.0;
45 // Same as READ_AHEAD_BEFORE_PLAYBACK_SECONDS, but the time is taken as a factor of the ping
46 // time to the Spotify server.
47 // Both, READ_AHEAD_BEFORE_PLAYBACK_SECONDS and READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS are
48 // obeyed.
49 // Note: the calculations are done using the nominal bitrate of the file. The actual amount
50 // of audio data may be larger or smaller.
51 
52 pub const READ_AHEAD_DURING_PLAYBACK_SECONDS: f64 = 5.0;
53 // While playing back, this many seconds of data ahead of the current read position are
54 // requested.
55 // Note: the calculations are done using the nominal bitrate of the file. The actual amount
56 // of audio data may be larger or smaller.
57 
58 pub const READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS: f64 = 10.0;
59 // Same as READ_AHEAD_DURING_PLAYBACK_SECONDS, but the time is taken as a factor of the ping
60 // time to the Spotify server.
61 // Note: the calculations are done using the nominal bitrate of the file. The actual amount
62 // of audio data may be larger or smaller.
63 
64 const PREFETCH_THRESHOLD_FACTOR: f64 = 4.0;
65 // If the amount of data that is pending (requested but not received) is less than a certain amount,
66 // data is pre-fetched in addition to the read ahead settings above. The threshold for requesting more
67 // data is calculated as
68 // <pending bytes> < PREFETCH_THRESHOLD_FACTOR * <ping time> * <nominal data rate>
69 
70 const FAST_PREFETCH_THRESHOLD_FACTOR: f64 = 1.5;
71 // Similar to PREFETCH_THRESHOLD_FACTOR, but it also takes the current download rate into account.
72 // The formula used is
73 // <pending bytes> < FAST_PREFETCH_THRESHOLD_FACTOR * <ping time> * <measured download rate>
74 // This mechanism allows for fast downloading of the remainder of the file. The number should be larger
75 // than 1 so the download rate ramps up until the bandwidth is saturated. The larger the value, the faster
76 // the download rate ramps up. However, this comes at the cost that it might hurt ping-time if a seek is
77 // performed while downloading. Values smaller than 1 cause the download rate to collapse and effectively
78 // only PREFETCH_THRESHOLD_FACTOR is in effect. Thus, set to zero if bandwidth saturation is not wanted.
79 
80 const MAX_PREFETCH_REQUESTS: usize = 4;
81 // Limit the number of requests that are pending simultaneously before pre-fetching data. Pending
82 // requests share bandwidth. Thus, havint too many requests can lead to the one that is needed next
83 // for playback to be delayed leading to a buffer underrun. This limit has the effect that a new
84 // pre-fetch request is only sent if less than MAX_PREFETCH_REQUESTS are pending.
85 
86 pub enum AudioFile {
87     Cached(fs::File),
88     Streaming(AudioFileStreaming),
89 }
90 
91 pub enum AudioFileOpen {
92     Cached(Option<fs::File>),
93     Streaming(AudioFileOpenStreaming),
94 }
95 
96 pub struct AudioFileOpenStreaming {
97     session: Session,
98     initial_data_rx: Option<ChannelData>,
99     initial_data_length: Option<usize>,
100     initial_request_sent_time: Instant,
101     headers: ChannelHeaders,
102     file_id: FileId,
103     complete_tx: Option<oneshot::Sender<NamedTempFile>>,
104     streaming_data_rate: usize,
105 }
106 
107 enum StreamLoaderCommand {
108     Fetch(Range),       // signal the stream loader to fetch a range of the file
109     RandomAccessMode(), // optimise download strategy for random access
110     StreamMode(),       // optimise download strategy for streaming
111     Close(),            // terminate and don't load any more data
112 }
113 
114 #[derive(Clone)]
115 pub struct StreamLoaderController {
116     channel_tx: Option<mpsc::UnboundedSender<StreamLoaderCommand>>,
117     stream_shared: Option<Arc<AudioFileShared>>,
118     file_size: usize,
119 }
120 
121 impl StreamLoaderController {
len(&self) -> usize122     pub fn len(&self) -> usize {
123         return self.file_size;
124     }
125 
range_available(&self, range: Range) -> bool126     pub fn range_available(&self, range: Range) -> bool {
127         if let Some(ref shared) = self.stream_shared {
128             let download_status = shared.download_status.lock().unwrap();
129             if range.length
130                 <= download_status
131                     .downloaded
132                     .contained_length_from_value(range.start)
133             {
134                 return true;
135             } else {
136                 return false;
137             }
138         } else {
139             if range.length <= self.len() - range.start {
140                 return true;
141             } else {
142                 return false;
143             }
144         }
145     }
146 
ping_time_ms(&self) -> usize147     pub fn ping_time_ms(&self) -> usize {
148         if let Some(ref shared) = self.stream_shared {
149             return shared.ping_time_ms.load(atomic::Ordering::Relaxed);
150         } else {
151             return 0;
152         }
153     }
154 
send_stream_loader_command(&mut self, command: StreamLoaderCommand)155     fn send_stream_loader_command(&mut self, command: StreamLoaderCommand) {
156         if let Some(ref mut channel) = self.channel_tx {
157             // ignore the error in case the channel has been closed already.
158             let _ = channel.unbounded_send(command);
159         }
160     }
161 
fetch(&mut self, range: Range)162     pub fn fetch(&mut self, range: Range) {
163         // signal the stream loader to fetch a range of the file
164         self.send_stream_loader_command(StreamLoaderCommand::Fetch(range));
165     }
166 
fetch_blocking(&mut self, mut range: Range)167     pub fn fetch_blocking(&mut self, mut range: Range) {
168         // signal the stream loader to tech a range of the file and block until it is loaded.
169 
170         // ensure the range is within the file's bounds.
171         if range.start >= self.len() {
172             range.length = 0;
173         } else if range.end() > self.len() {
174             range.length = self.len() - range.start;
175         }
176 
177         self.fetch(range);
178 
179         if let Some(ref shared) = self.stream_shared {
180             let mut download_status = shared.download_status.lock().unwrap();
181             while range.length
182                 > download_status
183                     .downloaded
184                     .contained_length_from_value(range.start)
185             {
186                 download_status = shared
187                     .cond
188                     .wait_timeout(download_status, Duration::from_millis(1000))
189                     .unwrap()
190                     .0;
191                 if range.length
192                     > (download_status
193                         .downloaded
194                         .union(&download_status.requested)
195                         .contained_length_from_value(range.start))
196                 {
197                     // For some reason, the requested range is neither downloaded nor requested.
198                     // This could be due to a network error. Request it again.
199                     // We can't use self.fetch here because self can't be borrowed mutably, so we access the channel directly.
200                     if let Some(ref mut channel) = self.channel_tx {
201                         // ignore the error in case the channel has been closed already.
202                         let _ = channel.unbounded_send(StreamLoaderCommand::Fetch(range));
203                     }
204                 }
205             }
206         }
207     }
208 
fetch_next(&mut self, length: usize)209     pub fn fetch_next(&mut self, length: usize) {
210         let range: Range = if let Some(ref shared) = self.stream_shared {
211             Range {
212                 start: shared.read_position.load(atomic::Ordering::Relaxed),
213                 length: length,
214             }
215         } else {
216             return;
217         };
218         self.fetch(range);
219     }
220 
fetch_next_blocking(&mut self, length: usize)221     pub fn fetch_next_blocking(&mut self, length: usize) {
222         let range: Range = if let Some(ref shared) = self.stream_shared {
223             Range {
224                 start: shared.read_position.load(atomic::Ordering::Relaxed),
225                 length: length,
226             }
227         } else {
228             return;
229         };
230         self.fetch_blocking(range);
231     }
232 
set_random_access_mode(&mut self)233     pub fn set_random_access_mode(&mut self) {
234         // optimise download strategy for random access
235         self.send_stream_loader_command(StreamLoaderCommand::RandomAccessMode());
236     }
237 
set_stream_mode(&mut self)238     pub fn set_stream_mode(&mut self) {
239         // optimise download strategy for streaming
240         self.send_stream_loader_command(StreamLoaderCommand::StreamMode());
241     }
242 
close(&mut self)243     pub fn close(&mut self) {
244         // terminate stream loading and don't load any more data for this file.
245         self.send_stream_loader_command(StreamLoaderCommand::Close());
246     }
247 }
248 
249 pub struct AudioFileStreaming {
250     read_file: fs::File,
251 
252     position: u64,
253 
254     stream_loader_command_tx: mpsc::UnboundedSender<StreamLoaderCommand>,
255 
256     shared: Arc<AudioFileShared>,
257 }
258 
259 struct AudioFileDownloadStatus {
260     requested: RangeSet,
261     downloaded: RangeSet,
262 }
263 
264 #[derive(Copy, Clone)]
265 enum DownloadStrategy {
266     RandomAccess(),
267     Streaming(),
268 }
269 
270 struct AudioFileShared {
271     file_id: FileId,
272     file_size: usize,
273     stream_data_rate: usize,
274     cond: Condvar,
275     download_status: Mutex<AudioFileDownloadStatus>,
276     download_strategy: Mutex<DownloadStrategy>,
277     number_of_open_requests: AtomicUsize,
278     ping_time_ms: AtomicUsize,
279     read_position: AtomicUsize,
280 }
281 
282 impl AudioFileOpenStreaming {
finish(&mut self, size: usize) -> AudioFileStreaming283     fn finish(&mut self, size: usize) -> AudioFileStreaming {
284         let shared = Arc::new(AudioFileShared {
285             file_id: self.file_id,
286             file_size: size,
287             stream_data_rate: self.streaming_data_rate,
288             cond: Condvar::new(),
289             download_status: Mutex::new(AudioFileDownloadStatus {
290                 requested: RangeSet::new(),
291                 downloaded: RangeSet::new(),
292             }),
293             download_strategy: Mutex::new(DownloadStrategy::RandomAccess()), // start with random access mode until someone tells us otherwise
294             number_of_open_requests: AtomicUsize::new(0),
295             ping_time_ms: AtomicUsize::new(0),
296             read_position: AtomicUsize::new(0),
297         });
298 
299         let mut write_file = NamedTempFile::new().unwrap();
300         write_file.as_file().set_len(size as u64).unwrap();
301         write_file.seek(SeekFrom::Start(0)).unwrap();
302 
303         let read_file = write_file.reopen().unwrap();
304 
305         let initial_data_rx = self.initial_data_rx.take().unwrap();
306         let initial_data_length = self.initial_data_length.take().unwrap();
307         let complete_tx = self.complete_tx.take().unwrap();
308         //let (seek_tx, seek_rx) = mpsc::unbounded();
309         let (stream_loader_command_tx, stream_loader_command_rx) =
310             mpsc::unbounded::<StreamLoaderCommand>();
311 
312         let fetcher = AudioFileFetch::new(
313             self.session.clone(),
314             shared.clone(),
315             initial_data_rx,
316             self.initial_request_sent_time,
317             initial_data_length,
318             write_file,
319             stream_loader_command_rx,
320             complete_tx,
321         );
322         self.session.spawn(move |_| fetcher);
323 
324         AudioFileStreaming {
325             read_file: read_file,
326 
327             position: 0,
328             //seek: seek_tx,
329             stream_loader_command_tx: stream_loader_command_tx,
330 
331             shared: shared,
332         }
333     }
334 }
335 
336 impl Future for AudioFileOpen {
337     type Item = AudioFile;
338     type Error = ChannelError;
339 
poll(&mut self) -> Poll<AudioFile, ChannelError>340     fn poll(&mut self) -> Poll<AudioFile, ChannelError> {
341         match *self {
342             AudioFileOpen::Streaming(ref mut open) => {
343                 let file = try_ready!(open.poll());
344                 Ok(Async::Ready(AudioFile::Streaming(file)))
345             }
346             AudioFileOpen::Cached(ref mut file) => {
347                 let file = file.take().unwrap();
348                 Ok(Async::Ready(AudioFile::Cached(file)))
349             }
350         }
351     }
352 }
353 
354 impl Future for AudioFileOpenStreaming {
355     type Item = AudioFileStreaming;
356     type Error = ChannelError;
357 
poll(&mut self) -> Poll<AudioFileStreaming, ChannelError>358     fn poll(&mut self) -> Poll<AudioFileStreaming, ChannelError> {
359         loop {
360             let (id, data) = try_ready!(self.headers.poll()).unwrap();
361 
362             if id == 0x3 {
363                 let size = BigEndian::read_u32(&data) as usize * 4;
364                 let file = self.finish(size);
365 
366                 return Ok(Async::Ready(file));
367             }
368         }
369     }
370 }
371 
372 impl AudioFile {
open( session: &Session, file_id: FileId, bytes_per_second: usize, play_from_beginning: bool, ) -> AudioFileOpen373     pub fn open(
374         session: &Session,
375         file_id: FileId,
376         bytes_per_second: usize,
377         play_from_beginning: bool,
378     ) -> AudioFileOpen {
379         let cache = session.cache().cloned();
380 
381         if let Some(file) = cache.as_ref().and_then(|cache| cache.file(file_id)) {
382             debug!("File {} already in cache", file_id);
383             return AudioFileOpen::Cached(Some(file));
384         }
385 
386         debug!("Downloading file {}", file_id);
387 
388         let (complete_tx, complete_rx) = oneshot::channel();
389         let mut initial_data_length = if play_from_beginning {
390             INITIAL_DOWNLOAD_SIZE
391                 + max(
392                     (READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize,
393                     (INITIAL_PING_TIME_ESTIMATE_SECONDS
394                         * READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS
395                         * bytes_per_second as f64) as usize,
396                 )
397         } else {
398             INITIAL_DOWNLOAD_SIZE
399         };
400         if initial_data_length % 4 != 0 {
401             initial_data_length += 4 - (initial_data_length % 4);
402         }
403         let (headers, data) = request_range(session, file_id, 0, initial_data_length).split();
404 
405         let open = AudioFileOpenStreaming {
406             session: session.clone(),
407             file_id: file_id,
408 
409             headers: headers,
410             initial_data_rx: Some(data),
411             initial_data_length: Some(initial_data_length),
412             initial_request_sent_time: Instant::now(),
413 
414             complete_tx: Some(complete_tx),
415             streaming_data_rate: bytes_per_second,
416         };
417 
418         let session_ = session.clone();
419         session.spawn(move |_| {
420             complete_rx
421                 .map(move |mut file| {
422                     if let Some(cache) = session_.cache() {
423                         cache.save_file(file_id, &mut file);
424                         debug!("File {} complete, saving to cache", file_id);
425                     } else {
426                         debug!("File {} complete", file_id);
427                     }
428                 })
429                 .or_else(|oneshot::Canceled| Ok(()))
430         });
431 
432         return AudioFileOpen::Streaming(open);
433     }
434 
get_stream_loader_controller(&self) -> StreamLoaderController435     pub fn get_stream_loader_controller(&self) -> StreamLoaderController {
436         match self {
437             AudioFile::Streaming(ref stream) => {
438                 return StreamLoaderController {
439                     channel_tx: Some(stream.stream_loader_command_tx.clone()),
440                     stream_shared: Some(stream.shared.clone()),
441                     file_size: stream.shared.file_size,
442                 };
443             }
444             AudioFile::Cached(ref file) => {
445                 return StreamLoaderController {
446                     channel_tx: None,
447                     stream_shared: None,
448                     file_size: file.metadata().unwrap().len() as usize,
449                 };
450             }
451         }
452     }
453 }
454 
request_range(session: &Session, file: FileId, offset: usize, length: usize) -> Channel455 fn request_range(session: &Session, file: FileId, offset: usize, length: usize) -> Channel {
456     assert!(
457         offset % 4 == 0,
458         "Range request start positions must be aligned by 4 bytes."
459     );
460     assert!(
461         length % 4 == 0,
462         "Range request range lengths must be aligned by 4 bytes."
463     );
464     let start = offset / 4;
465     let end = (offset + length) / 4;
466 
467     let (id, channel) = session.channel().allocate();
468 
469     let mut data: Vec<u8> = Vec::new();
470     data.write_u16::<BigEndian>(id).unwrap();
471     data.write_u8(0).unwrap();
472     data.write_u8(1).unwrap();
473     data.write_u16::<BigEndian>(0x0000).unwrap();
474     data.write_u32::<BigEndian>(0x00000000).unwrap();
475     data.write_u32::<BigEndian>(0x00009C40).unwrap();
476     data.write_u32::<BigEndian>(0x00020000).unwrap();
477     data.write(&file.0).unwrap();
478     data.write_u32::<BigEndian>(start as u32).unwrap();
479     data.write_u32::<BigEndian>(end as u32).unwrap();
480 
481     session.send_packet(0x8, data);
482 
483     channel
484 }
485 
486 struct PartialFileData {
487     offset: usize,
488     data: Bytes,
489 }
490 
491 enum ReceivedData {
492     ResponseTimeMs(usize),
493     Data(PartialFileData),
494 }
495 
496 struct AudioFileFetchDataReceiver {
497     shared: Arc<AudioFileShared>,
498     file_data_tx: mpsc::UnboundedSender<ReceivedData>,
499     data_rx: ChannelData,
500     initial_data_offset: usize,
501     initial_request_length: usize,
502     data_offset: usize,
503     request_length: usize,
504     request_sent_time: Option<Instant>,
505     measure_ping_time: bool,
506 }
507 
508 impl AudioFileFetchDataReceiver {
new( shared: Arc<AudioFileShared>, file_data_tx: mpsc::UnboundedSender<ReceivedData>, data_rx: ChannelData, data_offset: usize, request_length: usize, request_sent_time: Instant, ) -> AudioFileFetchDataReceiver509     fn new(
510         shared: Arc<AudioFileShared>,
511         file_data_tx: mpsc::UnboundedSender<ReceivedData>,
512         data_rx: ChannelData,
513         data_offset: usize,
514         request_length: usize,
515         request_sent_time: Instant,
516     ) -> AudioFileFetchDataReceiver {
517         let measure_ping_time = shared
518             .number_of_open_requests
519             .load(atomic::Ordering::SeqCst)
520             == 0;
521 
522         shared
523             .number_of_open_requests
524             .fetch_add(1, atomic::Ordering::SeqCst);
525 
526         AudioFileFetchDataReceiver {
527             shared: shared,
528             data_rx: data_rx,
529             file_data_tx: file_data_tx,
530             initial_data_offset: data_offset,
531             initial_request_length: request_length,
532             data_offset: data_offset,
533             request_length: request_length,
534             request_sent_time: Some(request_sent_time),
535             measure_ping_time: measure_ping_time,
536         }
537     }
538 }
539 
540 impl AudioFileFetchDataReceiver {
finish(&mut self)541     fn finish(&mut self) {
542         if self.request_length > 0 {
543             let missing_range = Range::new(self.data_offset, self.request_length);
544 
545             let mut download_status = self.shared.download_status.lock().unwrap();
546             download_status.requested.subtract_range(&missing_range);
547             self.shared.cond.notify_all();
548         }
549 
550         self.shared
551             .number_of_open_requests
552             .fetch_sub(1, atomic::Ordering::SeqCst);
553     }
554 }
555 
556 impl Future for AudioFileFetchDataReceiver {
557     type Item = ();
558     type Error = ();
559 
poll(&mut self) -> Poll<(), ()>560     fn poll(&mut self) -> Poll<(), ()> {
561         loop {
562             match self.data_rx.poll() {
563                 Ok(Async::Ready(Some(data))) => {
564                     if self.measure_ping_time {
565                         if let Some(request_sent_time) = self.request_sent_time {
566                             let duration = Instant::now() - request_sent_time;
567                             let duration_ms: u64;
568                             if 0.001 * (duration.as_millis() as f64)
569                                 > MAXIMUM_ASSUMED_PING_TIME_SECONDS
570                             {
571                                 duration_ms = (MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000.0) as u64;
572                             } else {
573                                 duration_ms = duration.as_millis() as u64;
574                             }
575                             let _ = self
576                                 .file_data_tx
577                                 .unbounded_send(ReceivedData::ResponseTimeMs(duration_ms as usize));
578                             self.measure_ping_time = false;
579                         }
580                     }
581                     let data_size = data.len();
582                     let _ = self
583                         .file_data_tx
584                         .unbounded_send(ReceivedData::Data(PartialFileData {
585                             offset: self.data_offset,
586                             data: data,
587                         }));
588                     self.data_offset += data_size;
589                     if self.request_length < data_size {
590                         warn!("Data receiver for range {} (+{}) received more data from server than requested.", self.initial_data_offset, self.initial_request_length);
591                         self.request_length = 0;
592                     } else {
593                         self.request_length -= data_size;
594                     }
595                     if self.request_length == 0 {
596                         self.finish();
597                         return Ok(Async::Ready(()));
598                     }
599                 }
600                 Ok(Async::Ready(None)) => {
601                     if self.request_length > 0 {
602                         warn!("Data receiver for range {} (+{}) received less data from server than requested.", self.initial_data_offset, self.initial_request_length);
603                     }
604                     self.finish();
605                     return Ok(Async::Ready(()));
606                 }
607                 Ok(Async::NotReady) => {
608                     return Ok(Async::NotReady);
609                 }
610                 Err(ChannelError) => {
611                     warn!(
612                         "Error from channel for data receiver for range {} (+{}).",
613                         self.initial_data_offset, self.initial_request_length
614                     );
615                     self.finish();
616                     return Ok(Async::Ready(()));
617                 }
618             }
619         }
620     }
621 }
622 
623 struct AudioFileFetch {
624     session: Session,
625     shared: Arc<AudioFileShared>,
626     output: Option<NamedTempFile>,
627 
628     file_data_tx: mpsc::UnboundedSender<ReceivedData>,
629     file_data_rx: mpsc::UnboundedReceiver<ReceivedData>,
630 
631     stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>,
632     complete_tx: Option<oneshot::Sender<NamedTempFile>>,
633     network_response_times_ms: Vec<usize>,
634 }
635 
636 impl AudioFileFetch {
new( session: Session, shared: Arc<AudioFileShared>, initial_data_rx: ChannelData, initial_request_sent_time: Instant, initial_data_length: usize, output: NamedTempFile, stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>, complete_tx: oneshot::Sender<NamedTempFile>, ) -> AudioFileFetch637     fn new(
638         session: Session,
639         shared: Arc<AudioFileShared>,
640         initial_data_rx: ChannelData,
641         initial_request_sent_time: Instant,
642         initial_data_length: usize,
643 
644         output: NamedTempFile,
645         stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>,
646         complete_tx: oneshot::Sender<NamedTempFile>,
647     ) -> AudioFileFetch {
648         let (file_data_tx, file_data_rx) = unbounded::<ReceivedData>();
649 
650         {
651             let requested_range = Range::new(0, initial_data_length);
652             let mut download_status = shared.download_status.lock().unwrap();
653             download_status.requested.add_range(&requested_range);
654         }
655 
656         let initial_data_receiver = AudioFileFetchDataReceiver::new(
657             shared.clone(),
658             file_data_tx.clone(),
659             initial_data_rx,
660             0,
661             initial_data_length,
662             initial_request_sent_time,
663         );
664 
665         session.spawn(move |_| initial_data_receiver);
666 
667         AudioFileFetch {
668             session: session,
669             shared: shared,
670             output: Some(output),
671 
672             file_data_tx: file_data_tx,
673             file_data_rx: file_data_rx,
674 
675             stream_loader_command_rx: stream_loader_command_rx,
676             complete_tx: Some(complete_tx),
677             network_response_times_ms: Vec::new(),
678         }
679     }
680 
get_download_strategy(&mut self) -> DownloadStrategy681     fn get_download_strategy(&mut self) -> DownloadStrategy {
682         *(self.shared.download_strategy.lock().unwrap())
683     }
684 
download_range(&mut self, mut offset: usize, mut length: usize)685     fn download_range(&mut self, mut offset: usize, mut length: usize) {
686         if length < MINIMUM_DOWNLOAD_SIZE {
687             length = MINIMUM_DOWNLOAD_SIZE;
688         }
689 
690         // ensure the values are within the bounds and align them by 4 for the spotify protocol.
691         if offset >= self.shared.file_size {
692             return;
693         }
694 
695         if length <= 0 {
696             return;
697         }
698 
699         if offset + length > self.shared.file_size {
700             length = self.shared.file_size - offset;
701         }
702 
703         if offset % 4 != 0 {
704             length += offset % 4;
705             offset -= offset % 4;
706         }
707 
708         if length % 4 != 0 {
709             length += 4 - (length % 4);
710         }
711 
712         let mut ranges_to_request = RangeSet::new();
713         ranges_to_request.add_range(&Range::new(offset, length));
714 
715         let mut download_status = self.shared.download_status.lock().unwrap();
716 
717         ranges_to_request.subtract_range_set(&download_status.downloaded);
718         ranges_to_request.subtract_range_set(&download_status.requested);
719 
720         for range in ranges_to_request.iter() {
721             let (_headers, data) = request_range(
722                 &self.session,
723                 self.shared.file_id,
724                 range.start,
725                 range.length,
726             )
727             .split();
728 
729             download_status.requested.add_range(range);
730 
731             let receiver = AudioFileFetchDataReceiver::new(
732                 self.shared.clone(),
733                 self.file_data_tx.clone(),
734                 data,
735                 range.start,
736                 range.length,
737                 Instant::now(),
738             );
739 
740             self.session.spawn(move |_| receiver);
741         }
742     }
743 
pre_fetch_more_data(&mut self, bytes: usize, max_requests_to_send: usize)744     fn pre_fetch_more_data(&mut self, bytes: usize, max_requests_to_send: usize) {
745         let mut bytes_to_go = bytes;
746         let mut requests_to_go = max_requests_to_send;
747 
748         while bytes_to_go > 0 && requests_to_go > 0 {
749             // determine what is still missing
750             let mut missing_data = RangeSet::new();
751             missing_data.add_range(&Range::new(0, self.shared.file_size));
752             {
753                 let download_status = self.shared.download_status.lock().unwrap();
754                 missing_data.subtract_range_set(&download_status.downloaded);
755                 missing_data.subtract_range_set(&download_status.requested);
756             }
757 
758             // download data from after the current read position first
759             let mut tail_end = RangeSet::new();
760             let read_position = self.shared.read_position.load(atomic::Ordering::Relaxed);
761             tail_end.add_range(&Range::new(
762                 read_position,
763                 self.shared.file_size - read_position,
764             ));
765             let tail_end = tail_end.intersection(&missing_data);
766 
767             if !tail_end.is_empty() {
768                 let range = tail_end.get_range(0);
769                 let offset = range.start;
770                 let length = min(range.length, bytes_to_go);
771                 self.download_range(offset, length);
772                 requests_to_go -= 1;
773                 bytes_to_go -= length;
774             } else if !missing_data.is_empty() {
775                 // ok, the tail is downloaded, download something fom the beginning.
776                 let range = missing_data.get_range(0);
777                 let offset = range.start;
778                 let length = min(range.length, bytes_to_go);
779                 self.download_range(offset, length);
780                 requests_to_go -= 1;
781                 bytes_to_go -= length;
782             } else {
783                 return;
784             }
785         }
786     }
787 
poll_file_data_rx(&mut self) -> Poll<(), ()>788     fn poll_file_data_rx(&mut self) -> Poll<(), ()> {
789         loop {
790             match self.file_data_rx.poll() {
791                 Ok(Async::Ready(None)) => {
792                     return Ok(Async::Ready(()));
793                 }
794                 Ok(Async::Ready(Some(ReceivedData::ResponseTimeMs(response_time_ms)))) => {
795                     trace!("Ping time estimated as: {} ms.", response_time_ms);
796 
797                     // record the response time
798                     self.network_response_times_ms.push(response_time_ms);
799 
800                     // prune old response times. Keep at most three.
801                     while self.network_response_times_ms.len() > 3 {
802                         self.network_response_times_ms.remove(0);
803                     }
804 
805                     // stats::median is experimental. So we calculate the median of up to three ourselves.
806                     let ping_time_ms: usize = match self.network_response_times_ms.len() {
807                         1 => self.network_response_times_ms[0] as usize,
808                         2 => {
809                             ((self.network_response_times_ms[0]
810                                 + self.network_response_times_ms[1])
811                                 / 2) as usize
812                         }
813                         3 => {
814                             let mut times = self.network_response_times_ms.clone();
815                             times.sort();
816                             times[1]
817                         }
818                         _ => unreachable!(),
819                     };
820 
821                     // store our new estimate for everyone to see
822                     self.shared
823                         .ping_time_ms
824                         .store(ping_time_ms, atomic::Ordering::Relaxed);
825                 }
826                 Ok(Async::Ready(Some(ReceivedData::Data(data)))) => {
827                     self.output
828                         .as_mut()
829                         .unwrap()
830                         .seek(SeekFrom::Start(data.offset as u64))
831                         .unwrap();
832                     self.output
833                         .as_mut()
834                         .unwrap()
835                         .write_all(data.data.as_ref())
836                         .unwrap();
837 
838                     let mut full = false;
839 
840                     {
841                         let mut download_status = self.shared.download_status.lock().unwrap();
842 
843                         let received_range = Range::new(data.offset, data.data.len());
844                         download_status.downloaded.add_range(&received_range);
845                         self.shared.cond.notify_all();
846 
847                         if download_status.downloaded.contained_length_from_value(0)
848                             >= self.shared.file_size
849                         {
850                             full = true;
851                         }
852 
853                         drop(download_status);
854                     }
855 
856                     if full {
857                         self.finish();
858                         return Ok(Async::Ready(()));
859                     }
860                 }
861                 Ok(Async::NotReady) => {
862                     return Ok(Async::NotReady);
863                 }
864                 Err(()) => unreachable!(),
865             }
866         }
867     }
868 
poll_stream_loader_command_rx(&mut self) -> Poll<(), ()>869     fn poll_stream_loader_command_rx(&mut self) -> Poll<(), ()> {
870         loop {
871             match self.stream_loader_command_rx.poll() {
872                 Ok(Async::Ready(None)) => {
873                     return Ok(Async::Ready(()));
874                 }
875                 Ok(Async::Ready(Some(StreamLoaderCommand::Fetch(request)))) => {
876                     self.download_range(request.start, request.length);
877                 }
878                 Ok(Async::Ready(Some(StreamLoaderCommand::RandomAccessMode()))) => {
879                     *(self.shared.download_strategy.lock().unwrap()) =
880                         DownloadStrategy::RandomAccess();
881                 }
882                 Ok(Async::Ready(Some(StreamLoaderCommand::StreamMode()))) => {
883                     *(self.shared.download_strategy.lock().unwrap()) =
884                         DownloadStrategy::Streaming();
885                 }
886                 Ok(Async::Ready(Some(StreamLoaderCommand::Close()))) => {
887                     return Ok(Async::Ready(()));
888                 }
889                 Ok(Async::NotReady) => return Ok(Async::NotReady),
890                 Err(()) => unreachable!(),
891             }
892         }
893     }
894 
finish(&mut self)895     fn finish(&mut self) {
896         let mut output = self.output.take().unwrap();
897         let complete_tx = self.complete_tx.take().unwrap();
898 
899         output.seek(SeekFrom::Start(0)).unwrap();
900         let _ = complete_tx.send(output);
901     }
902 }
903 
904 impl Future for AudioFileFetch {
905     type Item = ();
906     type Error = ();
907 
poll(&mut self) -> Poll<(), ()>908     fn poll(&mut self) -> Poll<(), ()> {
909         match self.poll_stream_loader_command_rx() {
910             Ok(Async::NotReady) => (),
911             Ok(Async::Ready(_)) => {
912                 return Ok(Async::Ready(()));
913             }
914             Err(()) => unreachable!(),
915         }
916 
917         match self.poll_file_data_rx() {
918             Ok(Async::NotReady) => (),
919             Ok(Async::Ready(_)) => {
920                 return Ok(Async::Ready(()));
921             }
922             Err(()) => unreachable!(),
923         }
924 
925         if let DownloadStrategy::Streaming() = self.get_download_strategy() {
926             let number_of_open_requests = self
927                 .shared
928                 .number_of_open_requests
929                 .load(atomic::Ordering::SeqCst);
930             let max_requests_to_send =
931                 MAX_PREFETCH_REQUESTS - min(MAX_PREFETCH_REQUESTS, number_of_open_requests);
932 
933             if max_requests_to_send > 0 {
934                 let bytes_pending: usize = {
935                     let download_status = self.shared.download_status.lock().unwrap();
936                     download_status
937                         .requested
938                         .minus(&download_status.downloaded)
939                         .len()
940                 };
941 
942                 let ping_time_seconds =
943                     0.001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64;
944                 let download_rate = self.session.channel().get_download_rate_estimate();
945 
946                 let desired_pending_bytes = max(
947                     (PREFETCH_THRESHOLD_FACTOR
948                         * ping_time_seconds
949                         * self.shared.stream_data_rate as f64) as usize,
950                     (FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64)
951                         as usize,
952                 );
953 
954                 if bytes_pending < desired_pending_bytes {
955                     self.pre_fetch_more_data(
956                         desired_pending_bytes - bytes_pending,
957                         max_requests_to_send,
958                     );
959                 }
960             }
961         }
962 
963         return Ok(Async::NotReady);
964     }
965 }
966 
967 impl Read for AudioFileStreaming {
read(&mut self, output: &mut [u8]) -> io::Result<usize>968     fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
969         let offset = self.position as usize;
970 
971         if offset >= self.shared.file_size {
972             return Ok(0);
973         }
974 
975         let length = min(output.len(), self.shared.file_size - offset);
976 
977         let length_to_request = match *(self.shared.download_strategy.lock().unwrap()) {
978             DownloadStrategy::RandomAccess() => length,
979             DownloadStrategy::Streaming() => {
980                 // Due to the read-ahead stuff, we potentially request more than the actual reqeust demanded.
981                 let ping_time_seconds =
982                     0.0001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64;
983 
984                 let length_to_request = length
985                     + max(
986                         (READ_AHEAD_DURING_PLAYBACK_SECONDS * self.shared.stream_data_rate as f64)
987                             as usize,
988                         (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS
989                             * ping_time_seconds
990                             * self.shared.stream_data_rate as f64) as usize,
991                     );
992                 min(length_to_request, self.shared.file_size - offset)
993             }
994         };
995 
996         let mut ranges_to_request = RangeSet::new();
997         ranges_to_request.add_range(&Range::new(offset, length_to_request));
998 
999         let mut download_status = self.shared.download_status.lock().unwrap();
1000         ranges_to_request.subtract_range_set(&download_status.downloaded);
1001         ranges_to_request.subtract_range_set(&download_status.requested);
1002 
1003         for range in ranges_to_request.iter() {
1004             self.stream_loader_command_tx
1005                 .unbounded_send(StreamLoaderCommand::Fetch(range.clone()))
1006                 .unwrap();
1007         }
1008 
1009         if length == 0 {
1010             return Ok(0);
1011         }
1012 
1013         let mut download_message_printed = false;
1014         while !download_status.downloaded.contains(offset) {
1015             if let DownloadStrategy::Streaming() = *self.shared.download_strategy.lock().unwrap() {
1016                 if !download_message_printed {
1017                     debug!("Stream waiting for download of file position {}. Downloaded ranges: {}. Pending ranges: {}", offset, download_status.downloaded, download_status.requested.minus(&download_status.downloaded));
1018                     download_message_printed = true;
1019                 }
1020             }
1021             download_status = self
1022                 .shared
1023                 .cond
1024                 .wait_timeout(download_status, Duration::from_millis(1000))
1025                 .unwrap()
1026                 .0;
1027         }
1028         let available_length = download_status
1029             .downloaded
1030             .contained_length_from_value(offset);
1031         assert!(available_length > 0);
1032         drop(download_status);
1033 
1034         self.position = self.read_file.seek(SeekFrom::Start(offset as u64)).unwrap();
1035         let read_len = min(length, available_length);
1036         let read_len = self.read_file.read(&mut output[..read_len])?;
1037 
1038         if download_message_printed {
1039             debug!(
1040                 "Read at postion {} completed. {} bytes returned, {} bytes were requested.",
1041                 offset,
1042                 read_len,
1043                 output.len()
1044             );
1045         }
1046 
1047         self.position += read_len as u64;
1048         self.shared
1049             .read_position
1050             .store(self.position as usize, atomic::Ordering::Relaxed);
1051 
1052         return Ok(read_len);
1053     }
1054 }
1055 
1056 impl Seek for AudioFileStreaming {
seek(&mut self, pos: SeekFrom) -> io::Result<u64>1057     fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
1058         self.position = self.read_file.seek(pos)?;
1059         // Do not seek past EOF
1060         self.shared
1061             .read_position
1062             .store(self.position as usize, atomic::Ordering::Relaxed);
1063         Ok(self.position)
1064     }
1065 }
1066 
1067 impl Read for AudioFile {
read(&mut self, output: &mut [u8]) -> io::Result<usize>1068     fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
1069         match *self {
1070             AudioFile::Cached(ref mut file) => file.read(output),
1071             AudioFile::Streaming(ref mut file) => file.read(output),
1072         }
1073     }
1074 }
1075 
1076 impl Seek for AudioFile {
seek(&mut self, pos: SeekFrom) -> io::Result<u64>1077     fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
1078         match *self {
1079             AudioFile::Cached(ref mut file) => file.seek(pos),
1080             AudioFile::Streaming(ref mut file) => file.seek(pos),
1081         }
1082     }
1083 }
1084