1 use crate::range_set::{Range, RangeSet};
2 use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
3 use bytes::Bytes;
4 use futures::sync::{mpsc, oneshot};
5 use futures::Stream;
6 use futures::{Async, Future, Poll};
7 use std::cmp::{max, min};
8 use std::fs;
9 use std::io::{self, Read, Seek, SeekFrom, Write};
10 use std::sync::{Arc, Condvar, Mutex};
11 use std::time::{Duration, Instant};
12 use tempfile::NamedTempFile;
13 
14 use futures::sync::mpsc::unbounded;
15 use librespot_core::channel::{Channel, ChannelData, ChannelError, ChannelHeaders};
16 use librespot_core::session::Session;
17 use librespot_core::spotify_id::FileId;
18 use std::sync::atomic;
19 use std::sync::atomic::AtomicUsize;
20 
21 const MINIMUM_DOWNLOAD_SIZE: usize = 1024 * 16;
22 // The minimum size of a block that is requested from the Spotify servers in one request.
23 // This is the block size that is typically requested while doing a seek() on a file.
24 // Note: smaller requests can happen if part of the block is downloaded already.
25 
26 const INITIAL_DOWNLOAD_SIZE: usize = 1024 * 16;
27 // The amount of data that is requested when initially opening a file.
28 // Note: if the file is opened to play from the beginning, the amount of data to
29 // read ahead is requested in addition to this amount. If the file is opened to seek to
30 // another position, then only this amount is requested on the first request.
31 
32 const INITIAL_PING_TIME_ESTIMATE_SECONDS: f64 = 0.5;
33 // The pig time that is used for calculations before a ping time was actually measured.
34 
35 const MAXIMUM_ASSUMED_PING_TIME_SECONDS: f64 = 1.5;
36 // If the measured ping time to the Spotify server is larger than this value, it is capped
37 // to avoid run-away block sizes and pre-fetching.
38 
39 pub const READ_AHEAD_BEFORE_PLAYBACK_SECONDS: f64 = 1.0;
40 // Before playback starts, this many seconds of data must be present.
41 // Note: the calculations are done using the nominal bitrate of the file. The actual amount
42 // of audio data may be larger or smaller.
43 
44 pub const READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS: f64 = 2.0;
45 // Same as READ_AHEAD_BEFORE_PLAYBACK_SECONDS, but the time is taken as a factor of the ping
46 // time to the Spotify server.
47 // Both, READ_AHEAD_BEFORE_PLAYBACK_SECONDS and READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS are
48 // obeyed.
49 // Note: the calculations are done using the nominal bitrate of the file. The actual amount
50 // of audio data may be larger or smaller.
51 
52 pub const READ_AHEAD_DURING_PLAYBACK_SECONDS: f64 = 5.0;
53 // While playing back, this many seconds of data ahead of the current read position are
54 // requested.
55 // Note: the calculations are done using the nominal bitrate of the file. The actual amount
56 // of audio data may be larger or smaller.
57 
58 pub const READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS: f64 = 10.0;
59 // Same as READ_AHEAD_DURING_PLAYBACK_SECONDS, but the time is taken as a factor of the ping
60 // time to the Spotify server.
61 // Note: the calculations are done using the nominal bitrate of the file. The actual amount
62 // of audio data may be larger or smaller.
63 
64 const PREFETCH_THRESHOLD_FACTOR: f64 = 4.0;
65 // If the amount of data that is pending (requested but not received) is less than a certain amount,
66 // data is pre-fetched in addition to the read ahead settings above. The threshold for requesting more
67 // data is calculated as
68 // <pending bytes> < PREFETCH_THRESHOLD_FACTOR * <ping time> * <nominal data rate>
69 
70 const FAST_PREFETCH_THRESHOLD_FACTOR: f64 = 1.5;
71 // Similar to PREFETCH_THRESHOLD_FACTOR, but it also takes the current download rate into account.
72 // The formula used is
73 // <pending bytes> < FAST_PREFETCH_THRESHOLD_FACTOR * <ping time> * <measured download rate>
74 // This mechanism allows for fast downloading of the remainder of the file. The number should be larger
75 // than 1 so the download rate ramps up until the bandwidth is saturated. The larger the value, the faster
76 // the download rate ramps up. However, this comes at the cost that it might hurt ping-time if a seek is
77 // performed while downloading. Values smaller than 1 cause the download rate to collapse and effectively
78 // only PREFETCH_THRESHOLD_FACTOR is in effect. Thus, set to zero if bandwidth saturation is not wanted.
79 
80 const MAX_PREFETCH_REQUESTS: usize = 4;
81 // Limit the number of requests that are pending simultaneously before pre-fetching data. Pending
82 // requests share bandwidth. Thus, havint too many requests can lead to the one that is needed next
83 // for playback to be delayed leading to a buffer underrun. This limit has the effect that a new
84 // pre-fetch request is only sent if less than MAX_PREFETCH_REQUESTS are pending.
85 
86 pub enum AudioFile {
87     Cached(fs::File),
88     Streaming(AudioFileStreaming),
89 }
90 
91 pub enum AudioFileOpen {
92     Cached(Option<fs::File>),
93     Streaming(AudioFileOpenStreaming),
94 }
95 
96 pub struct AudioFileOpenStreaming {
97     session: Session,
98     initial_data_rx: Option<ChannelData>,
99     initial_data_length: Option<usize>,
100     initial_request_sent_time: Instant,
101     headers: ChannelHeaders,
102     file_id: FileId,
103     complete_tx: Option<oneshot::Sender<NamedTempFile>>,
104     streaming_data_rate: usize,
105 }
106 
107 enum StreamLoaderCommand {
108     Fetch(Range),       // signal the stream loader to fetch a range of the file
109     RandomAccessMode(), // optimise download strategy for random access
110     StreamMode(),       // optimise download strategy for streaming
111     Close(),            // terminate and don't load any more data
112 }
113 
114 #[derive(Clone)]
115 pub struct StreamLoaderController {
116     channel_tx: Option<mpsc::UnboundedSender<StreamLoaderCommand>>,
117     stream_shared: Option<Arc<AudioFileShared>>,
118     file_size: usize,
119 }
120 
121 impl StreamLoaderController {
len(&self) -> usize122     pub fn len(&self) -> usize {
123         return self.file_size;
124     }
125 
range_available(&self, range: Range) -> bool126     pub fn range_available(&self, range: Range) -> bool {
127         if let Some(ref shared) = self.stream_shared {
128             let download_status = shared.download_status.lock().unwrap();
129             if range.length
130                 <= download_status
131                     .downloaded
132                     .contained_length_from_value(range.start)
133             {
134                 return true;
135             } else {
136                 return false;
137             }
138         } else {
139             if range.length <= self.len() - range.start {
140                 return true;
141             } else {
142                 return false;
143             }
144         }
145     }
146 
range_to_end_available(&self) -> bool147     pub fn range_to_end_available(&self) -> bool {
148         if let Some(ref shared) = self.stream_shared {
149             let read_position = shared.read_position.load(atomic::Ordering::Relaxed);
150             self.range_available(Range::new(read_position, self.len() - read_position))
151         } else {
152             true
153         }
154     }
155 
ping_time_ms(&self) -> usize156     pub fn ping_time_ms(&self) -> usize {
157         if let Some(ref shared) = self.stream_shared {
158             return shared.ping_time_ms.load(atomic::Ordering::Relaxed);
159         } else {
160             return 0;
161         }
162     }
163 
send_stream_loader_command(&mut self, command: StreamLoaderCommand)164     fn send_stream_loader_command(&mut self, command: StreamLoaderCommand) {
165         if let Some(ref mut channel) = self.channel_tx {
166             // ignore the error in case the channel has been closed already.
167             let _ = channel.unbounded_send(command);
168         }
169     }
170 
fetch(&mut self, range: Range)171     pub fn fetch(&mut self, range: Range) {
172         // signal the stream loader to fetch a range of the file
173         self.send_stream_loader_command(StreamLoaderCommand::Fetch(range));
174     }
175 
fetch_blocking(&mut self, mut range: Range)176     pub fn fetch_blocking(&mut self, mut range: Range) {
177         // signal the stream loader to tech a range of the file and block until it is loaded.
178 
179         // ensure the range is within the file's bounds.
180         if range.start >= self.len() {
181             range.length = 0;
182         } else if range.end() > self.len() {
183             range.length = self.len() - range.start;
184         }
185 
186         self.fetch(range);
187 
188         if let Some(ref shared) = self.stream_shared {
189             let mut download_status = shared.download_status.lock().unwrap();
190             while range.length
191                 > download_status
192                     .downloaded
193                     .contained_length_from_value(range.start)
194             {
195                 download_status = shared
196                     .cond
197                     .wait_timeout(download_status, Duration::from_millis(1000))
198                     .unwrap()
199                     .0;
200                 if range.length
201                     > (download_status
202                         .downloaded
203                         .union(&download_status.requested)
204                         .contained_length_from_value(range.start))
205                 {
206                     // For some reason, the requested range is neither downloaded nor requested.
207                     // This could be due to a network error. Request it again.
208                     // We can't use self.fetch here because self can't be borrowed mutably, so we access the channel directly.
209                     if let Some(ref mut channel) = self.channel_tx {
210                         // ignore the error in case the channel has been closed already.
211                         let _ = channel.unbounded_send(StreamLoaderCommand::Fetch(range));
212                     }
213                 }
214             }
215         }
216     }
217 
fetch_next(&mut self, length: usize)218     pub fn fetch_next(&mut self, length: usize) {
219         let range: Range = if let Some(ref shared) = self.stream_shared {
220             Range {
221                 start: shared.read_position.load(atomic::Ordering::Relaxed),
222                 length: length,
223             }
224         } else {
225             return;
226         };
227         self.fetch(range);
228     }
229 
fetch_next_blocking(&mut self, length: usize)230     pub fn fetch_next_blocking(&mut self, length: usize) {
231         let range: Range = if let Some(ref shared) = self.stream_shared {
232             Range {
233                 start: shared.read_position.load(atomic::Ordering::Relaxed),
234                 length: length,
235             }
236         } else {
237             return;
238         };
239         self.fetch_blocking(range);
240     }
241 
set_random_access_mode(&mut self)242     pub fn set_random_access_mode(&mut self) {
243         // optimise download strategy for random access
244         self.send_stream_loader_command(StreamLoaderCommand::RandomAccessMode());
245     }
246 
set_stream_mode(&mut self)247     pub fn set_stream_mode(&mut self) {
248         // optimise download strategy for streaming
249         self.send_stream_loader_command(StreamLoaderCommand::StreamMode());
250     }
251 
close(&mut self)252     pub fn close(&mut self) {
253         // terminate stream loading and don't load any more data for this file.
254         self.send_stream_loader_command(StreamLoaderCommand::Close());
255     }
256 }
257 
258 pub struct AudioFileStreaming {
259     read_file: fs::File,
260 
261     position: u64,
262 
263     stream_loader_command_tx: mpsc::UnboundedSender<StreamLoaderCommand>,
264 
265     shared: Arc<AudioFileShared>,
266 }
267 
268 struct AudioFileDownloadStatus {
269     requested: RangeSet,
270     downloaded: RangeSet,
271 }
272 
273 #[derive(Copy, Clone)]
274 enum DownloadStrategy {
275     RandomAccess(),
276     Streaming(),
277 }
278 
279 struct AudioFileShared {
280     file_id: FileId,
281     file_size: usize,
282     stream_data_rate: usize,
283     cond: Condvar,
284     download_status: Mutex<AudioFileDownloadStatus>,
285     download_strategy: Mutex<DownloadStrategy>,
286     number_of_open_requests: AtomicUsize,
287     ping_time_ms: AtomicUsize,
288     read_position: AtomicUsize,
289 }
290 
291 impl AudioFileOpenStreaming {
finish(&mut self, size: usize) -> AudioFileStreaming292     fn finish(&mut self, size: usize) -> AudioFileStreaming {
293         let shared = Arc::new(AudioFileShared {
294             file_id: self.file_id,
295             file_size: size,
296             stream_data_rate: self.streaming_data_rate,
297             cond: Condvar::new(),
298             download_status: Mutex::new(AudioFileDownloadStatus {
299                 requested: RangeSet::new(),
300                 downloaded: RangeSet::new(),
301             }),
302             download_strategy: Mutex::new(DownloadStrategy::RandomAccess()), // start with random access mode until someone tells us otherwise
303             number_of_open_requests: AtomicUsize::new(0),
304             ping_time_ms: AtomicUsize::new(0),
305             read_position: AtomicUsize::new(0),
306         });
307 
308         let mut write_file = NamedTempFile::new().unwrap();
309         write_file.as_file().set_len(size as u64).unwrap();
310         write_file.seek(SeekFrom::Start(0)).unwrap();
311 
312         let read_file = write_file.reopen().unwrap();
313 
314         let initial_data_rx = self.initial_data_rx.take().unwrap();
315         let initial_data_length = self.initial_data_length.take().unwrap();
316         let complete_tx = self.complete_tx.take().unwrap();
317         //let (seek_tx, seek_rx) = mpsc::unbounded();
318         let (stream_loader_command_tx, stream_loader_command_rx) =
319             mpsc::unbounded::<StreamLoaderCommand>();
320 
321         let fetcher = AudioFileFetch::new(
322             self.session.clone(),
323             shared.clone(),
324             initial_data_rx,
325             self.initial_request_sent_time,
326             initial_data_length,
327             write_file,
328             stream_loader_command_rx,
329             complete_tx,
330         );
331         self.session.spawn(move |_| fetcher);
332 
333         AudioFileStreaming {
334             read_file: read_file,
335 
336             position: 0,
337             //seek: seek_tx,
338             stream_loader_command_tx: stream_loader_command_tx,
339 
340             shared: shared,
341         }
342     }
343 }
344 
345 impl Future for AudioFileOpen {
346     type Item = AudioFile;
347     type Error = ChannelError;
348 
poll(&mut self) -> Poll<AudioFile, ChannelError>349     fn poll(&mut self) -> Poll<AudioFile, ChannelError> {
350         match *self {
351             AudioFileOpen::Streaming(ref mut open) => {
352                 let file = try_ready!(open.poll());
353                 Ok(Async::Ready(AudioFile::Streaming(file)))
354             }
355             AudioFileOpen::Cached(ref mut file) => {
356                 let file = file.take().unwrap();
357                 Ok(Async::Ready(AudioFile::Cached(file)))
358             }
359         }
360     }
361 }
362 
363 impl Future for AudioFileOpenStreaming {
364     type Item = AudioFileStreaming;
365     type Error = ChannelError;
366 
poll(&mut self) -> Poll<AudioFileStreaming, ChannelError>367     fn poll(&mut self) -> Poll<AudioFileStreaming, ChannelError> {
368         loop {
369             let (id, data) = try_ready!(self.headers.poll()).unwrap();
370 
371             if id == 0x3 {
372                 let size = BigEndian::read_u32(&data) as usize * 4;
373                 let file = self.finish(size);
374 
375                 return Ok(Async::Ready(file));
376             }
377         }
378     }
379 }
380 
381 impl AudioFile {
open( session: &Session, file_id: FileId, bytes_per_second: usize, play_from_beginning: bool, ) -> AudioFileOpen382     pub fn open(
383         session: &Session,
384         file_id: FileId,
385         bytes_per_second: usize,
386         play_from_beginning: bool,
387     ) -> AudioFileOpen {
388         let cache = session.cache().cloned();
389 
390         if let Some(file) = cache.as_ref().and_then(|cache| cache.file(file_id)) {
391             debug!("File {} already in cache", file_id);
392             return AudioFileOpen::Cached(Some(file));
393         }
394 
395         debug!("Downloading file {}", file_id);
396 
397         let (complete_tx, complete_rx) = oneshot::channel();
398         let mut initial_data_length = if play_from_beginning {
399             INITIAL_DOWNLOAD_SIZE
400                 + max(
401                     (READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize,
402                     (INITIAL_PING_TIME_ESTIMATE_SECONDS
403                         * READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS
404                         * bytes_per_second as f64) as usize,
405                 )
406         } else {
407             INITIAL_DOWNLOAD_SIZE
408         };
409         if initial_data_length % 4 != 0 {
410             initial_data_length += 4 - (initial_data_length % 4);
411         }
412         let (headers, data) = request_range(session, file_id, 0, initial_data_length).split();
413 
414         let open = AudioFileOpenStreaming {
415             session: session.clone(),
416             file_id: file_id,
417 
418             headers: headers,
419             initial_data_rx: Some(data),
420             initial_data_length: Some(initial_data_length),
421             initial_request_sent_time: Instant::now(),
422 
423             complete_tx: Some(complete_tx),
424             streaming_data_rate: bytes_per_second,
425         };
426 
427         let session_ = session.clone();
428         session.spawn(move |_| {
429             complete_rx
430                 .map(move |mut file| {
431                     if let Some(cache) = session_.cache() {
432                         cache.save_file(file_id, &mut file);
433                         debug!("File {} complete, saving to cache", file_id);
434                     } else {
435                         debug!("File {} complete", file_id);
436                     }
437                 })
438                 .or_else(|oneshot::Canceled| Ok(()))
439         });
440 
441         return AudioFileOpen::Streaming(open);
442     }
443 
get_stream_loader_controller(&self) -> StreamLoaderController444     pub fn get_stream_loader_controller(&self) -> StreamLoaderController {
445         match self {
446             AudioFile::Streaming(ref stream) => {
447                 return StreamLoaderController {
448                     channel_tx: Some(stream.stream_loader_command_tx.clone()),
449                     stream_shared: Some(stream.shared.clone()),
450                     file_size: stream.shared.file_size,
451                 };
452             }
453             AudioFile::Cached(ref file) => {
454                 return StreamLoaderController {
455                     channel_tx: None,
456                     stream_shared: None,
457                     file_size: file.metadata().unwrap().len() as usize,
458                 };
459             }
460         }
461     }
462 }
463 
request_range(session: &Session, file: FileId, offset: usize, length: usize) -> Channel464 fn request_range(session: &Session, file: FileId, offset: usize, length: usize) -> Channel {
465     assert!(
466         offset % 4 == 0,
467         "Range request start positions must be aligned by 4 bytes."
468     );
469     assert!(
470         length % 4 == 0,
471         "Range request range lengths must be aligned by 4 bytes."
472     );
473     let start = offset / 4;
474     let end = (offset + length) / 4;
475 
476     let (id, channel) = session.channel().allocate();
477 
478     let mut data: Vec<u8> = Vec::new();
479     data.write_u16::<BigEndian>(id).unwrap();
480     data.write_u8(0).unwrap();
481     data.write_u8(1).unwrap();
482     data.write_u16::<BigEndian>(0x0000).unwrap();
483     data.write_u32::<BigEndian>(0x00000000).unwrap();
484     data.write_u32::<BigEndian>(0x00009C40).unwrap();
485     data.write_u32::<BigEndian>(0x00020000).unwrap();
486     data.write(&file.0).unwrap();
487     data.write_u32::<BigEndian>(start as u32).unwrap();
488     data.write_u32::<BigEndian>(end as u32).unwrap();
489 
490     session.send_packet(0x8, data);
491 
492     channel
493 }
494 
495 struct PartialFileData {
496     offset: usize,
497     data: Bytes,
498 }
499 
500 enum ReceivedData {
501     ResponseTimeMs(usize),
502     Data(PartialFileData),
503 }
504 
505 struct AudioFileFetchDataReceiver {
506     shared: Arc<AudioFileShared>,
507     file_data_tx: mpsc::UnboundedSender<ReceivedData>,
508     data_rx: ChannelData,
509     initial_data_offset: usize,
510     initial_request_length: usize,
511     data_offset: usize,
512     request_length: usize,
513     request_sent_time: Option<Instant>,
514     measure_ping_time: bool,
515 }
516 
517 impl AudioFileFetchDataReceiver {
new( shared: Arc<AudioFileShared>, file_data_tx: mpsc::UnboundedSender<ReceivedData>, data_rx: ChannelData, data_offset: usize, request_length: usize, request_sent_time: Instant, ) -> AudioFileFetchDataReceiver518     fn new(
519         shared: Arc<AudioFileShared>,
520         file_data_tx: mpsc::UnboundedSender<ReceivedData>,
521         data_rx: ChannelData,
522         data_offset: usize,
523         request_length: usize,
524         request_sent_time: Instant,
525     ) -> AudioFileFetchDataReceiver {
526         let measure_ping_time = shared
527             .number_of_open_requests
528             .load(atomic::Ordering::SeqCst)
529             == 0;
530 
531         shared
532             .number_of_open_requests
533             .fetch_add(1, atomic::Ordering::SeqCst);
534 
535         AudioFileFetchDataReceiver {
536             shared: shared,
537             data_rx: data_rx,
538             file_data_tx: file_data_tx,
539             initial_data_offset: data_offset,
540             initial_request_length: request_length,
541             data_offset: data_offset,
542             request_length: request_length,
543             request_sent_time: Some(request_sent_time),
544             measure_ping_time: measure_ping_time,
545         }
546     }
547 }
548 
549 impl AudioFileFetchDataReceiver {
finish(&mut self)550     fn finish(&mut self) {
551         if self.request_length > 0 {
552             let missing_range = Range::new(self.data_offset, self.request_length);
553 
554             let mut download_status = self.shared.download_status.lock().unwrap();
555             download_status.requested.subtract_range(&missing_range);
556             self.shared.cond.notify_all();
557         }
558 
559         self.shared
560             .number_of_open_requests
561             .fetch_sub(1, atomic::Ordering::SeqCst);
562     }
563 }
564 
565 impl Future for AudioFileFetchDataReceiver {
566     type Item = ();
567     type Error = ();
568 
poll(&mut self) -> Poll<(), ()>569     fn poll(&mut self) -> Poll<(), ()> {
570         loop {
571             match self.data_rx.poll() {
572                 Ok(Async::Ready(Some(data))) => {
573                     if self.measure_ping_time {
574                         if let Some(request_sent_time) = self.request_sent_time {
575                             let duration = Instant::now() - request_sent_time;
576                             let duration_ms: u64;
577                             if 0.001 * (duration.as_millis() as f64)
578                                 > MAXIMUM_ASSUMED_PING_TIME_SECONDS
579                             {
580                                 duration_ms = (MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000.0) as u64;
581                             } else {
582                                 duration_ms = duration.as_millis() as u64;
583                             }
584                             let _ = self
585                                 .file_data_tx
586                                 .unbounded_send(ReceivedData::ResponseTimeMs(duration_ms as usize));
587                             self.measure_ping_time = false;
588                         }
589                     }
590                     let data_size = data.len();
591                     let _ = self
592                         .file_data_tx
593                         .unbounded_send(ReceivedData::Data(PartialFileData {
594                             offset: self.data_offset,
595                             data: data,
596                         }));
597                     self.data_offset += data_size;
598                     if self.request_length < data_size {
599                         warn!("Data receiver for range {} (+{}) received more data from server than requested.", self.initial_data_offset, self.initial_request_length);
600                         self.request_length = 0;
601                     } else {
602                         self.request_length -= data_size;
603                     }
604                     if self.request_length == 0 {
605                         self.finish();
606                         return Ok(Async::Ready(()));
607                     }
608                 }
609                 Ok(Async::Ready(None)) => {
610                     if self.request_length > 0 {
611                         warn!("Data receiver for range {} (+{}) received less data from server than requested.", self.initial_data_offset, self.initial_request_length);
612                     }
613                     self.finish();
614                     return Ok(Async::Ready(()));
615                 }
616                 Ok(Async::NotReady) => {
617                     return Ok(Async::NotReady);
618                 }
619                 Err(ChannelError) => {
620                     warn!(
621                         "Error from channel for data receiver for range {} (+{}).",
622                         self.initial_data_offset, self.initial_request_length
623                     );
624                     self.finish();
625                     return Ok(Async::Ready(()));
626                 }
627             }
628         }
629     }
630 }
631 
632 struct AudioFileFetch {
633     session: Session,
634     shared: Arc<AudioFileShared>,
635     output: Option<NamedTempFile>,
636 
637     file_data_tx: mpsc::UnboundedSender<ReceivedData>,
638     file_data_rx: mpsc::UnboundedReceiver<ReceivedData>,
639 
640     stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>,
641     complete_tx: Option<oneshot::Sender<NamedTempFile>>,
642     network_response_times_ms: Vec<usize>,
643 }
644 
645 impl AudioFileFetch {
new( session: Session, shared: Arc<AudioFileShared>, initial_data_rx: ChannelData, initial_request_sent_time: Instant, initial_data_length: usize, output: NamedTempFile, stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>, complete_tx: oneshot::Sender<NamedTempFile>, ) -> AudioFileFetch646     fn new(
647         session: Session,
648         shared: Arc<AudioFileShared>,
649         initial_data_rx: ChannelData,
650         initial_request_sent_time: Instant,
651         initial_data_length: usize,
652 
653         output: NamedTempFile,
654         stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>,
655         complete_tx: oneshot::Sender<NamedTempFile>,
656     ) -> AudioFileFetch {
657         let (file_data_tx, file_data_rx) = unbounded::<ReceivedData>();
658 
659         {
660             let requested_range = Range::new(0, initial_data_length);
661             let mut download_status = shared.download_status.lock().unwrap();
662             download_status.requested.add_range(&requested_range);
663         }
664 
665         let initial_data_receiver = AudioFileFetchDataReceiver::new(
666             shared.clone(),
667             file_data_tx.clone(),
668             initial_data_rx,
669             0,
670             initial_data_length,
671             initial_request_sent_time,
672         );
673 
674         session.spawn(move |_| initial_data_receiver);
675 
676         AudioFileFetch {
677             session: session,
678             shared: shared,
679             output: Some(output),
680 
681             file_data_tx: file_data_tx,
682             file_data_rx: file_data_rx,
683 
684             stream_loader_command_rx: stream_loader_command_rx,
685             complete_tx: Some(complete_tx),
686             network_response_times_ms: Vec::new(),
687         }
688     }
689 
get_download_strategy(&mut self) -> DownloadStrategy690     fn get_download_strategy(&mut self) -> DownloadStrategy {
691         *(self.shared.download_strategy.lock().unwrap())
692     }
693 
download_range(&mut self, mut offset: usize, mut length: usize)694     fn download_range(&mut self, mut offset: usize, mut length: usize) {
695         if length < MINIMUM_DOWNLOAD_SIZE {
696             length = MINIMUM_DOWNLOAD_SIZE;
697         }
698 
699         // ensure the values are within the bounds and align them by 4 for the spotify protocol.
700         if offset >= self.shared.file_size {
701             return;
702         }
703 
704         if length <= 0 {
705             return;
706         }
707 
708         if offset + length > self.shared.file_size {
709             length = self.shared.file_size - offset;
710         }
711 
712         if offset % 4 != 0 {
713             length += offset % 4;
714             offset -= offset % 4;
715         }
716 
717         if length % 4 != 0 {
718             length += 4 - (length % 4);
719         }
720 
721         let mut ranges_to_request = RangeSet::new();
722         ranges_to_request.add_range(&Range::new(offset, length));
723 
724         let mut download_status = self.shared.download_status.lock().unwrap();
725 
726         ranges_to_request.subtract_range_set(&download_status.downloaded);
727         ranges_to_request.subtract_range_set(&download_status.requested);
728 
729         for range in ranges_to_request.iter() {
730             let (_headers, data) = request_range(
731                 &self.session,
732                 self.shared.file_id,
733                 range.start,
734                 range.length,
735             )
736             .split();
737 
738             download_status.requested.add_range(range);
739 
740             let receiver = AudioFileFetchDataReceiver::new(
741                 self.shared.clone(),
742                 self.file_data_tx.clone(),
743                 data,
744                 range.start,
745                 range.length,
746                 Instant::now(),
747             );
748 
749             self.session.spawn(move |_| receiver);
750         }
751     }
752 
pre_fetch_more_data(&mut self, bytes: usize, max_requests_to_send: usize)753     fn pre_fetch_more_data(&mut self, bytes: usize, max_requests_to_send: usize) {
754         let mut bytes_to_go = bytes;
755         let mut requests_to_go = max_requests_to_send;
756 
757         while bytes_to_go > 0 && requests_to_go > 0 {
758             // determine what is still missing
759             let mut missing_data = RangeSet::new();
760             missing_data.add_range(&Range::new(0, self.shared.file_size));
761             {
762                 let download_status = self.shared.download_status.lock().unwrap();
763                 missing_data.subtract_range_set(&download_status.downloaded);
764                 missing_data.subtract_range_set(&download_status.requested);
765             }
766 
767             // download data from after the current read position first
768             let mut tail_end = RangeSet::new();
769             let read_position = self.shared.read_position.load(atomic::Ordering::Relaxed);
770             tail_end.add_range(&Range::new(
771                 read_position,
772                 self.shared.file_size - read_position,
773             ));
774             let tail_end = tail_end.intersection(&missing_data);
775 
776             if !tail_end.is_empty() {
777                 let range = tail_end.get_range(0);
778                 let offset = range.start;
779                 let length = min(range.length, bytes_to_go);
780                 self.download_range(offset, length);
781                 requests_to_go -= 1;
782                 bytes_to_go -= length;
783             } else if !missing_data.is_empty() {
784                 // ok, the tail is downloaded, download something fom the beginning.
785                 let range = missing_data.get_range(0);
786                 let offset = range.start;
787                 let length = min(range.length, bytes_to_go);
788                 self.download_range(offset, length);
789                 requests_to_go -= 1;
790                 bytes_to_go -= length;
791             } else {
792                 return;
793             }
794         }
795     }
796 
poll_file_data_rx(&mut self) -> Poll<(), ()>797     fn poll_file_data_rx(&mut self) -> Poll<(), ()> {
798         loop {
799             match self.file_data_rx.poll() {
800                 Ok(Async::Ready(None)) => {
801                     return Ok(Async::Ready(()));
802                 }
803                 Ok(Async::Ready(Some(ReceivedData::ResponseTimeMs(response_time_ms)))) => {
804                     trace!("Ping time estimated as: {} ms.", response_time_ms);
805 
806                     // record the response time
807                     self.network_response_times_ms.push(response_time_ms);
808 
809                     // prune old response times. Keep at most three.
810                     while self.network_response_times_ms.len() > 3 {
811                         self.network_response_times_ms.remove(0);
812                     }
813 
814                     // stats::median is experimental. So we calculate the median of up to three ourselves.
815                     let ping_time_ms: usize = match self.network_response_times_ms.len() {
816                         1 => self.network_response_times_ms[0] as usize,
817                         2 => {
818                             ((self.network_response_times_ms[0]
819                                 + self.network_response_times_ms[1])
820                                 / 2) as usize
821                         }
822                         3 => {
823                             let mut times = self.network_response_times_ms.clone();
824                             times.sort();
825                             times[1]
826                         }
827                         _ => unreachable!(),
828                     };
829 
830                     // store our new estimate for everyone to see
831                     self.shared
832                         .ping_time_ms
833                         .store(ping_time_ms, atomic::Ordering::Relaxed);
834                 }
835                 Ok(Async::Ready(Some(ReceivedData::Data(data)))) => {
836                     self.output
837                         .as_mut()
838                         .unwrap()
839                         .seek(SeekFrom::Start(data.offset as u64))
840                         .unwrap();
841                     self.output
842                         .as_mut()
843                         .unwrap()
844                         .write_all(data.data.as_ref())
845                         .unwrap();
846 
847                     let mut full = false;
848 
849                     {
850                         let mut download_status = self.shared.download_status.lock().unwrap();
851 
852                         let received_range = Range::new(data.offset, data.data.len());
853                         download_status.downloaded.add_range(&received_range);
854                         self.shared.cond.notify_all();
855 
856                         if download_status.downloaded.contained_length_from_value(0)
857                             >= self.shared.file_size
858                         {
859                             full = true;
860                         }
861 
862                         drop(download_status);
863                     }
864 
865                     if full {
866                         self.finish();
867                         return Ok(Async::Ready(()));
868                     }
869                 }
870                 Ok(Async::NotReady) => {
871                     return Ok(Async::NotReady);
872                 }
873                 Err(()) => unreachable!(),
874             }
875         }
876     }
877 
poll_stream_loader_command_rx(&mut self) -> Poll<(), ()>878     fn poll_stream_loader_command_rx(&mut self) -> Poll<(), ()> {
879         loop {
880             match self.stream_loader_command_rx.poll() {
881                 Ok(Async::Ready(None)) => {
882                     return Ok(Async::Ready(()));
883                 }
884                 Ok(Async::Ready(Some(StreamLoaderCommand::Fetch(request)))) => {
885                     self.download_range(request.start, request.length);
886                 }
887                 Ok(Async::Ready(Some(StreamLoaderCommand::RandomAccessMode()))) => {
888                     *(self.shared.download_strategy.lock().unwrap()) =
889                         DownloadStrategy::RandomAccess();
890                 }
891                 Ok(Async::Ready(Some(StreamLoaderCommand::StreamMode()))) => {
892                     *(self.shared.download_strategy.lock().unwrap()) =
893                         DownloadStrategy::Streaming();
894                 }
895                 Ok(Async::Ready(Some(StreamLoaderCommand::Close()))) => {
896                     return Ok(Async::Ready(()));
897                 }
898                 Ok(Async::NotReady) => return Ok(Async::NotReady),
899                 Err(()) => unreachable!(),
900             }
901         }
902     }
903 
finish(&mut self)904     fn finish(&mut self) {
905         let mut output = self.output.take().unwrap();
906         let complete_tx = self.complete_tx.take().unwrap();
907 
908         output.seek(SeekFrom::Start(0)).unwrap();
909         let _ = complete_tx.send(output);
910     }
911 }
912 
913 impl Future for AudioFileFetch {
914     type Item = ();
915     type Error = ();
916 
poll(&mut self) -> Poll<(), ()>917     fn poll(&mut self) -> Poll<(), ()> {
918         match self.poll_stream_loader_command_rx() {
919             Ok(Async::NotReady) => (),
920             Ok(Async::Ready(_)) => {
921                 return Ok(Async::Ready(()));
922             }
923             Err(()) => unreachable!(),
924         }
925 
926         match self.poll_file_data_rx() {
927             Ok(Async::NotReady) => (),
928             Ok(Async::Ready(_)) => {
929                 return Ok(Async::Ready(()));
930             }
931             Err(()) => unreachable!(),
932         }
933 
934         if let DownloadStrategy::Streaming() = self.get_download_strategy() {
935             let number_of_open_requests = self
936                 .shared
937                 .number_of_open_requests
938                 .load(atomic::Ordering::SeqCst);
939             let max_requests_to_send =
940                 MAX_PREFETCH_REQUESTS - min(MAX_PREFETCH_REQUESTS, number_of_open_requests);
941 
942             if max_requests_to_send > 0 {
943                 let bytes_pending: usize = {
944                     let download_status = self.shared.download_status.lock().unwrap();
945                     download_status
946                         .requested
947                         .minus(&download_status.downloaded)
948                         .len()
949                 };
950 
951                 let ping_time_seconds =
952                     0.001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64;
953                 let download_rate = self.session.channel().get_download_rate_estimate();
954 
955                 let desired_pending_bytes = max(
956                     (PREFETCH_THRESHOLD_FACTOR
957                         * ping_time_seconds
958                         * self.shared.stream_data_rate as f64) as usize,
959                     (FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64)
960                         as usize,
961                 );
962 
963                 if bytes_pending < desired_pending_bytes {
964                     self.pre_fetch_more_data(
965                         desired_pending_bytes - bytes_pending,
966                         max_requests_to_send,
967                     );
968                 }
969             }
970         }
971 
972         return Ok(Async::NotReady);
973     }
974 }
975 
976 impl Read for AudioFileStreaming {
read(&mut self, output: &mut [u8]) -> io::Result<usize>977     fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
978         let offset = self.position as usize;
979 
980         if offset >= self.shared.file_size {
981             return Ok(0);
982         }
983 
984         let length = min(output.len(), self.shared.file_size - offset);
985 
986         let length_to_request = match *(self.shared.download_strategy.lock().unwrap()) {
987             DownloadStrategy::RandomAccess() => length,
988             DownloadStrategy::Streaming() => {
989                 // Due to the read-ahead stuff, we potentially request more than the actual reqeust demanded.
990                 let ping_time_seconds =
991                     0.0001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64;
992 
993                 let length_to_request = length
994                     + max(
995                         (READ_AHEAD_DURING_PLAYBACK_SECONDS * self.shared.stream_data_rate as f64)
996                             as usize,
997                         (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS
998                             * ping_time_seconds
999                             * self.shared.stream_data_rate as f64) as usize,
1000                     );
1001                 min(length_to_request, self.shared.file_size - offset)
1002             }
1003         };
1004 
1005         let mut ranges_to_request = RangeSet::new();
1006         ranges_to_request.add_range(&Range::new(offset, length_to_request));
1007 
1008         let mut download_status = self.shared.download_status.lock().unwrap();
1009         ranges_to_request.subtract_range_set(&download_status.downloaded);
1010         ranges_to_request.subtract_range_set(&download_status.requested);
1011 
1012         for range in ranges_to_request.iter() {
1013             self.stream_loader_command_tx
1014                 .unbounded_send(StreamLoaderCommand::Fetch(range.clone()))
1015                 .unwrap();
1016         }
1017 
1018         if length == 0 {
1019             return Ok(0);
1020         }
1021 
1022         let mut download_message_printed = false;
1023         while !download_status.downloaded.contains(offset) {
1024             if let DownloadStrategy::Streaming() = *self.shared.download_strategy.lock().unwrap() {
1025                 if !download_message_printed {
1026                     debug!("Stream waiting for download of file position {}. Downloaded ranges: {}. Pending ranges: {}", offset, download_status.downloaded, download_status.requested.minus(&download_status.downloaded));
1027                     download_message_printed = true;
1028                 }
1029             }
1030             download_status = self
1031                 .shared
1032                 .cond
1033                 .wait_timeout(download_status, Duration::from_millis(1000))
1034                 .unwrap()
1035                 .0;
1036         }
1037         let available_length = download_status
1038             .downloaded
1039             .contained_length_from_value(offset);
1040         assert!(available_length > 0);
1041         drop(download_status);
1042 
1043         self.position = self.read_file.seek(SeekFrom::Start(offset as u64)).unwrap();
1044         let read_len = min(length, available_length);
1045         let read_len = self.read_file.read(&mut output[..read_len])?;
1046 
1047         if download_message_printed {
1048             debug!(
1049                 "Read at postion {} completed. {} bytes returned, {} bytes were requested.",
1050                 offset,
1051                 read_len,
1052                 output.len()
1053             );
1054         }
1055 
1056         self.position += read_len as u64;
1057         self.shared
1058             .read_position
1059             .store(self.position as usize, atomic::Ordering::Relaxed);
1060 
1061         return Ok(read_len);
1062     }
1063 }
1064 
1065 impl Seek for AudioFileStreaming {
seek(&mut self, pos: SeekFrom) -> io::Result<u64>1066     fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
1067         self.position = self.read_file.seek(pos)?;
1068         // Do not seek past EOF
1069         self.shared
1070             .read_position
1071             .store(self.position as usize, atomic::Ordering::Relaxed);
1072         Ok(self.position)
1073     }
1074 }
1075 
1076 impl Read for AudioFile {
read(&mut self, output: &mut [u8]) -> io::Result<usize>1077     fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
1078         match *self {
1079             AudioFile::Cached(ref mut file) => file.read(output),
1080             AudioFile::Streaming(ref mut file) => file.read(output),
1081         }
1082     }
1083 }
1084 
1085 impl Seek for AudioFile {
seek(&mut self, pos: SeekFrom) -> io::Result<u64>1086     fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
1087         match *self {
1088             AudioFile::Cached(ref mut file) => file.seek(pos),
1089             AudioFile::Streaming(ref mut file) => file.seek(pos),
1090         }
1091     }
1092 }
1093