1 use crate::range_set::{Range, RangeSet};
2 use byteorder::{BigEndian, ByteOrder, WriteBytesExt};
3 use bytes::Bytes;
4 use futures::sync::{mpsc, oneshot};
5 use futures::Stream;
6 use futures::{Async, Future, Poll};
7 use std::cmp::{max, min};
8 use std::fs;
9 use std::io::{self, Read, Seek, SeekFrom, Write};
10 use std::sync::{Arc, Condvar, Mutex};
11 use std::time::{Duration, Instant};
12 use tempfile::NamedTempFile;
13
14 use futures::sync::mpsc::unbounded;
15 use librespot_core::channel::{Channel, ChannelData, ChannelError, ChannelHeaders};
16 use librespot_core::session::Session;
17 use librespot_core::spotify_id::FileId;
18 use std::sync::atomic;
19 use std::sync::atomic::AtomicUsize;
20
21 const MINIMUM_DOWNLOAD_SIZE: usize = 1024 * 16;
22 // The minimum size of a block that is requested from the Spotify servers in one request.
23 // This is the block size that is typically requested while doing a seek() on a file.
24 // Note: smaller requests can happen if part of the block is downloaded already.
25
26 const INITIAL_DOWNLOAD_SIZE: usize = 1024 * 16;
27 // The amount of data that is requested when initially opening a file.
28 // Note: if the file is opened to play from the beginning, the amount of data to
29 // read ahead is requested in addition to this amount. If the file is opened to seek to
30 // another position, then only this amount is requested on the first request.
31
32 const INITIAL_PING_TIME_ESTIMATE_SECONDS: f64 = 0.5;
33 // The pig time that is used for calculations before a ping time was actually measured.
34
35 const MAXIMUM_ASSUMED_PING_TIME_SECONDS: f64 = 1.5;
36 // If the measured ping time to the Spotify server is larger than this value, it is capped
37 // to avoid run-away block sizes and pre-fetching.
38
39 pub const READ_AHEAD_BEFORE_PLAYBACK_SECONDS: f64 = 1.0;
40 // Before playback starts, this many seconds of data must be present.
41 // Note: the calculations are done using the nominal bitrate of the file. The actual amount
42 // of audio data may be larger or smaller.
43
44 pub const READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS: f64 = 2.0;
45 // Same as READ_AHEAD_BEFORE_PLAYBACK_SECONDS, but the time is taken as a factor of the ping
46 // time to the Spotify server.
47 // Both, READ_AHEAD_BEFORE_PLAYBACK_SECONDS and READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS are
48 // obeyed.
49 // Note: the calculations are done using the nominal bitrate of the file. The actual amount
50 // of audio data may be larger or smaller.
51
52 pub const READ_AHEAD_DURING_PLAYBACK_SECONDS: f64 = 5.0;
53 // While playing back, this many seconds of data ahead of the current read position are
54 // requested.
55 // Note: the calculations are done using the nominal bitrate of the file. The actual amount
56 // of audio data may be larger or smaller.
57
58 pub const READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS: f64 = 10.0;
59 // Same as READ_AHEAD_DURING_PLAYBACK_SECONDS, but the time is taken as a factor of the ping
60 // time to the Spotify server.
61 // Note: the calculations are done using the nominal bitrate of the file. The actual amount
62 // of audio data may be larger or smaller.
63
64 const PREFETCH_THRESHOLD_FACTOR: f64 = 4.0;
65 // If the amount of data that is pending (requested but not received) is less than a certain amount,
66 // data is pre-fetched in addition to the read ahead settings above. The threshold for requesting more
67 // data is calculated as
68 // <pending bytes> < PREFETCH_THRESHOLD_FACTOR * <ping time> * <nominal data rate>
69
70 const FAST_PREFETCH_THRESHOLD_FACTOR: f64 = 1.5;
71 // Similar to PREFETCH_THRESHOLD_FACTOR, but it also takes the current download rate into account.
72 // The formula used is
73 // <pending bytes> < FAST_PREFETCH_THRESHOLD_FACTOR * <ping time> * <measured download rate>
74 // This mechanism allows for fast downloading of the remainder of the file. The number should be larger
75 // than 1 so the download rate ramps up until the bandwidth is saturated. The larger the value, the faster
76 // the download rate ramps up. However, this comes at the cost that it might hurt ping-time if a seek is
77 // performed while downloading. Values smaller than 1 cause the download rate to collapse and effectively
78 // only PREFETCH_THRESHOLD_FACTOR is in effect. Thus, set to zero if bandwidth saturation is not wanted.
79
80 const MAX_PREFETCH_REQUESTS: usize = 4;
81 // Limit the number of requests that are pending simultaneously before pre-fetching data. Pending
82 // requests share bandwidth. Thus, havint too many requests can lead to the one that is needed next
83 // for playback to be delayed leading to a buffer underrun. This limit has the effect that a new
84 // pre-fetch request is only sent if less than MAX_PREFETCH_REQUESTS are pending.
85
86 pub enum AudioFile {
87 Cached(fs::File),
88 Streaming(AudioFileStreaming),
89 }
90
91 pub enum AudioFileOpen {
92 Cached(Option<fs::File>),
93 Streaming(AudioFileOpenStreaming),
94 }
95
96 pub struct AudioFileOpenStreaming {
97 session: Session,
98 initial_data_rx: Option<ChannelData>,
99 initial_data_length: Option<usize>,
100 initial_request_sent_time: Instant,
101 headers: ChannelHeaders,
102 file_id: FileId,
103 complete_tx: Option<oneshot::Sender<NamedTempFile>>,
104 streaming_data_rate: usize,
105 }
106
107 enum StreamLoaderCommand {
108 Fetch(Range), // signal the stream loader to fetch a range of the file
109 RandomAccessMode(), // optimise download strategy for random access
110 StreamMode(), // optimise download strategy for streaming
111 Close(), // terminate and don't load any more data
112 }
113
114 #[derive(Clone)]
115 pub struct StreamLoaderController {
116 channel_tx: Option<mpsc::UnboundedSender<StreamLoaderCommand>>,
117 stream_shared: Option<Arc<AudioFileShared>>,
118 file_size: usize,
119 }
120
121 impl StreamLoaderController {
len(&self) -> usize122 pub fn len(&self) -> usize {
123 return self.file_size;
124 }
125
range_available(&self, range: Range) -> bool126 pub fn range_available(&self, range: Range) -> bool {
127 if let Some(ref shared) = self.stream_shared {
128 let download_status = shared.download_status.lock().unwrap();
129 if range.length
130 <= download_status
131 .downloaded
132 .contained_length_from_value(range.start)
133 {
134 return true;
135 } else {
136 return false;
137 }
138 } else {
139 if range.length <= self.len() - range.start {
140 return true;
141 } else {
142 return false;
143 }
144 }
145 }
146
ping_time_ms(&self) -> usize147 pub fn ping_time_ms(&self) -> usize {
148 if let Some(ref shared) = self.stream_shared {
149 return shared.ping_time_ms.load(atomic::Ordering::Relaxed);
150 } else {
151 return 0;
152 }
153 }
154
send_stream_loader_command(&mut self, command: StreamLoaderCommand)155 fn send_stream_loader_command(&mut self, command: StreamLoaderCommand) {
156 if let Some(ref mut channel) = self.channel_tx {
157 // ignore the error in case the channel has been closed already.
158 let _ = channel.unbounded_send(command);
159 }
160 }
161
fetch(&mut self, range: Range)162 pub fn fetch(&mut self, range: Range) {
163 // signal the stream loader to fetch a range of the file
164 self.send_stream_loader_command(StreamLoaderCommand::Fetch(range));
165 }
166
fetch_blocking(&mut self, mut range: Range)167 pub fn fetch_blocking(&mut self, mut range: Range) {
168 // signal the stream loader to tech a range of the file and block until it is loaded.
169
170 // ensure the range is within the file's bounds.
171 if range.start >= self.len() {
172 range.length = 0;
173 } else if range.end() > self.len() {
174 range.length = self.len() - range.start;
175 }
176
177 self.fetch(range);
178
179 if let Some(ref shared) = self.stream_shared {
180 let mut download_status = shared.download_status.lock().unwrap();
181 while range.length
182 > download_status
183 .downloaded
184 .contained_length_from_value(range.start)
185 {
186 download_status = shared
187 .cond
188 .wait_timeout(download_status, Duration::from_millis(1000))
189 .unwrap()
190 .0;
191 if range.length
192 > (download_status
193 .downloaded
194 .union(&download_status.requested)
195 .contained_length_from_value(range.start))
196 {
197 // For some reason, the requested range is neither downloaded nor requested.
198 // This could be due to a network error. Request it again.
199 // We can't use self.fetch here because self can't be borrowed mutably, so we access the channel directly.
200 if let Some(ref mut channel) = self.channel_tx {
201 // ignore the error in case the channel has been closed already.
202 let _ = channel.unbounded_send(StreamLoaderCommand::Fetch(range));
203 }
204 }
205 }
206 }
207 }
208
fetch_next(&mut self, length: usize)209 pub fn fetch_next(&mut self, length: usize) {
210 let range: Range = if let Some(ref shared) = self.stream_shared {
211 Range {
212 start: shared.read_position.load(atomic::Ordering::Relaxed),
213 length: length,
214 }
215 } else {
216 return;
217 };
218 self.fetch(range);
219 }
220
fetch_next_blocking(&mut self, length: usize)221 pub fn fetch_next_blocking(&mut self, length: usize) {
222 let range: Range = if let Some(ref shared) = self.stream_shared {
223 Range {
224 start: shared.read_position.load(atomic::Ordering::Relaxed),
225 length: length,
226 }
227 } else {
228 return;
229 };
230 self.fetch_blocking(range);
231 }
232
set_random_access_mode(&mut self)233 pub fn set_random_access_mode(&mut self) {
234 // optimise download strategy for random access
235 self.send_stream_loader_command(StreamLoaderCommand::RandomAccessMode());
236 }
237
set_stream_mode(&mut self)238 pub fn set_stream_mode(&mut self) {
239 // optimise download strategy for streaming
240 self.send_stream_loader_command(StreamLoaderCommand::StreamMode());
241 }
242
close(&mut self)243 pub fn close(&mut self) {
244 // terminate stream loading and don't load any more data for this file.
245 self.send_stream_loader_command(StreamLoaderCommand::Close());
246 }
247 }
248
249 pub struct AudioFileStreaming {
250 read_file: fs::File,
251
252 position: u64,
253
254 stream_loader_command_tx: mpsc::UnboundedSender<StreamLoaderCommand>,
255
256 shared: Arc<AudioFileShared>,
257 }
258
259 struct AudioFileDownloadStatus {
260 requested: RangeSet,
261 downloaded: RangeSet,
262 }
263
264 #[derive(Copy, Clone)]
265 enum DownloadStrategy {
266 RandomAccess(),
267 Streaming(),
268 }
269
270 struct AudioFileShared {
271 file_id: FileId,
272 file_size: usize,
273 stream_data_rate: usize,
274 cond: Condvar,
275 download_status: Mutex<AudioFileDownloadStatus>,
276 download_strategy: Mutex<DownloadStrategy>,
277 number_of_open_requests: AtomicUsize,
278 ping_time_ms: AtomicUsize,
279 read_position: AtomicUsize,
280 }
281
282 impl AudioFileOpenStreaming {
finish(&mut self, size: usize) -> AudioFileStreaming283 fn finish(&mut self, size: usize) -> AudioFileStreaming {
284 let shared = Arc::new(AudioFileShared {
285 file_id: self.file_id,
286 file_size: size,
287 stream_data_rate: self.streaming_data_rate,
288 cond: Condvar::new(),
289 download_status: Mutex::new(AudioFileDownloadStatus {
290 requested: RangeSet::new(),
291 downloaded: RangeSet::new(),
292 }),
293 download_strategy: Mutex::new(DownloadStrategy::RandomAccess()), // start with random access mode until someone tells us otherwise
294 number_of_open_requests: AtomicUsize::new(0),
295 ping_time_ms: AtomicUsize::new(0),
296 read_position: AtomicUsize::new(0),
297 });
298
299 let mut write_file = NamedTempFile::new().unwrap();
300 write_file.as_file().set_len(size as u64).unwrap();
301 write_file.seek(SeekFrom::Start(0)).unwrap();
302
303 let read_file = write_file.reopen().unwrap();
304
305 let initial_data_rx = self.initial_data_rx.take().unwrap();
306 let initial_data_length = self.initial_data_length.take().unwrap();
307 let complete_tx = self.complete_tx.take().unwrap();
308 //let (seek_tx, seek_rx) = mpsc::unbounded();
309 let (stream_loader_command_tx, stream_loader_command_rx) =
310 mpsc::unbounded::<StreamLoaderCommand>();
311
312 let fetcher = AudioFileFetch::new(
313 self.session.clone(),
314 shared.clone(),
315 initial_data_rx,
316 self.initial_request_sent_time,
317 initial_data_length,
318 write_file,
319 stream_loader_command_rx,
320 complete_tx,
321 );
322 self.session.spawn(move |_| fetcher);
323
324 AudioFileStreaming {
325 read_file: read_file,
326
327 position: 0,
328 //seek: seek_tx,
329 stream_loader_command_tx: stream_loader_command_tx,
330
331 shared: shared,
332 }
333 }
334 }
335
336 impl Future for AudioFileOpen {
337 type Item = AudioFile;
338 type Error = ChannelError;
339
poll(&mut self) -> Poll<AudioFile, ChannelError>340 fn poll(&mut self) -> Poll<AudioFile, ChannelError> {
341 match *self {
342 AudioFileOpen::Streaming(ref mut open) => {
343 let file = try_ready!(open.poll());
344 Ok(Async::Ready(AudioFile::Streaming(file)))
345 }
346 AudioFileOpen::Cached(ref mut file) => {
347 let file = file.take().unwrap();
348 Ok(Async::Ready(AudioFile::Cached(file)))
349 }
350 }
351 }
352 }
353
354 impl Future for AudioFileOpenStreaming {
355 type Item = AudioFileStreaming;
356 type Error = ChannelError;
357
poll(&mut self) -> Poll<AudioFileStreaming, ChannelError>358 fn poll(&mut self) -> Poll<AudioFileStreaming, ChannelError> {
359 loop {
360 let (id, data) = try_ready!(self.headers.poll()).unwrap();
361
362 if id == 0x3 {
363 let size = BigEndian::read_u32(&data) as usize * 4;
364 let file = self.finish(size);
365
366 return Ok(Async::Ready(file));
367 }
368 }
369 }
370 }
371
372 impl AudioFile {
open( session: &Session, file_id: FileId, bytes_per_second: usize, play_from_beginning: bool, ) -> AudioFileOpen373 pub fn open(
374 session: &Session,
375 file_id: FileId,
376 bytes_per_second: usize,
377 play_from_beginning: bool,
378 ) -> AudioFileOpen {
379 let cache = session.cache().cloned();
380
381 if let Some(file) = cache.as_ref().and_then(|cache| cache.file(file_id)) {
382 debug!("File {} already in cache", file_id);
383 return AudioFileOpen::Cached(Some(file));
384 }
385
386 debug!("Downloading file {}", file_id);
387
388 let (complete_tx, complete_rx) = oneshot::channel();
389 let mut initial_data_length = if play_from_beginning {
390 INITIAL_DOWNLOAD_SIZE
391 + max(
392 (READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize,
393 (INITIAL_PING_TIME_ESTIMATE_SECONDS
394 * READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS
395 * bytes_per_second as f64) as usize,
396 )
397 } else {
398 INITIAL_DOWNLOAD_SIZE
399 };
400 if initial_data_length % 4 != 0 {
401 initial_data_length += 4 - (initial_data_length % 4);
402 }
403 let (headers, data) = request_range(session, file_id, 0, initial_data_length).split();
404
405 let open = AudioFileOpenStreaming {
406 session: session.clone(),
407 file_id: file_id,
408
409 headers: headers,
410 initial_data_rx: Some(data),
411 initial_data_length: Some(initial_data_length),
412 initial_request_sent_time: Instant::now(),
413
414 complete_tx: Some(complete_tx),
415 streaming_data_rate: bytes_per_second,
416 };
417
418 let session_ = session.clone();
419 session.spawn(move |_| {
420 complete_rx
421 .map(move |mut file| {
422 if let Some(cache) = session_.cache() {
423 cache.save_file(file_id, &mut file);
424 debug!("File {} complete, saving to cache", file_id);
425 } else {
426 debug!("File {} complete", file_id);
427 }
428 })
429 .or_else(|oneshot::Canceled| Ok(()))
430 });
431
432 return AudioFileOpen::Streaming(open);
433 }
434
get_stream_loader_controller(&self) -> StreamLoaderController435 pub fn get_stream_loader_controller(&self) -> StreamLoaderController {
436 match self {
437 AudioFile::Streaming(ref stream) => {
438 return StreamLoaderController {
439 channel_tx: Some(stream.stream_loader_command_tx.clone()),
440 stream_shared: Some(stream.shared.clone()),
441 file_size: stream.shared.file_size,
442 };
443 }
444 AudioFile::Cached(ref file) => {
445 return StreamLoaderController {
446 channel_tx: None,
447 stream_shared: None,
448 file_size: file.metadata().unwrap().len() as usize,
449 };
450 }
451 }
452 }
453 }
454
request_range(session: &Session, file: FileId, offset: usize, length: usize) -> Channel455 fn request_range(session: &Session, file: FileId, offset: usize, length: usize) -> Channel {
456 assert!(
457 offset % 4 == 0,
458 "Range request start positions must be aligned by 4 bytes."
459 );
460 assert!(
461 length % 4 == 0,
462 "Range request range lengths must be aligned by 4 bytes."
463 );
464 let start = offset / 4;
465 let end = (offset + length) / 4;
466
467 let (id, channel) = session.channel().allocate();
468
469 let mut data: Vec<u8> = Vec::new();
470 data.write_u16::<BigEndian>(id).unwrap();
471 data.write_u8(0).unwrap();
472 data.write_u8(1).unwrap();
473 data.write_u16::<BigEndian>(0x0000).unwrap();
474 data.write_u32::<BigEndian>(0x00000000).unwrap();
475 data.write_u32::<BigEndian>(0x00009C40).unwrap();
476 data.write_u32::<BigEndian>(0x00020000).unwrap();
477 data.write(&file.0).unwrap();
478 data.write_u32::<BigEndian>(start as u32).unwrap();
479 data.write_u32::<BigEndian>(end as u32).unwrap();
480
481 session.send_packet(0x8, data);
482
483 channel
484 }
485
486 struct PartialFileData {
487 offset: usize,
488 data: Bytes,
489 }
490
491 enum ReceivedData {
492 ResponseTimeMs(usize),
493 Data(PartialFileData),
494 }
495
496 struct AudioFileFetchDataReceiver {
497 shared: Arc<AudioFileShared>,
498 file_data_tx: mpsc::UnboundedSender<ReceivedData>,
499 data_rx: ChannelData,
500 initial_data_offset: usize,
501 initial_request_length: usize,
502 data_offset: usize,
503 request_length: usize,
504 request_sent_time: Option<Instant>,
505 measure_ping_time: bool,
506 }
507
508 impl AudioFileFetchDataReceiver {
new( shared: Arc<AudioFileShared>, file_data_tx: mpsc::UnboundedSender<ReceivedData>, data_rx: ChannelData, data_offset: usize, request_length: usize, request_sent_time: Instant, ) -> AudioFileFetchDataReceiver509 fn new(
510 shared: Arc<AudioFileShared>,
511 file_data_tx: mpsc::UnboundedSender<ReceivedData>,
512 data_rx: ChannelData,
513 data_offset: usize,
514 request_length: usize,
515 request_sent_time: Instant,
516 ) -> AudioFileFetchDataReceiver {
517 let measure_ping_time = shared
518 .number_of_open_requests
519 .load(atomic::Ordering::SeqCst)
520 == 0;
521
522 shared
523 .number_of_open_requests
524 .fetch_add(1, atomic::Ordering::SeqCst);
525
526 AudioFileFetchDataReceiver {
527 shared: shared,
528 data_rx: data_rx,
529 file_data_tx: file_data_tx,
530 initial_data_offset: data_offset,
531 initial_request_length: request_length,
532 data_offset: data_offset,
533 request_length: request_length,
534 request_sent_time: Some(request_sent_time),
535 measure_ping_time: measure_ping_time,
536 }
537 }
538 }
539
540 impl AudioFileFetchDataReceiver {
finish(&mut self)541 fn finish(&mut self) {
542 if self.request_length > 0 {
543 let missing_range = Range::new(self.data_offset, self.request_length);
544
545 let mut download_status = self.shared.download_status.lock().unwrap();
546 download_status.requested.subtract_range(&missing_range);
547 self.shared.cond.notify_all();
548 }
549
550 self.shared
551 .number_of_open_requests
552 .fetch_sub(1, atomic::Ordering::SeqCst);
553 }
554 }
555
556 impl Future for AudioFileFetchDataReceiver {
557 type Item = ();
558 type Error = ();
559
poll(&mut self) -> Poll<(), ()>560 fn poll(&mut self) -> Poll<(), ()> {
561 loop {
562 match self.data_rx.poll() {
563 Ok(Async::Ready(Some(data))) => {
564 if self.measure_ping_time {
565 if let Some(request_sent_time) = self.request_sent_time {
566 let duration = Instant::now() - request_sent_time;
567 let duration_ms: u64;
568 if 0.001 * (duration.as_millis() as f64)
569 > MAXIMUM_ASSUMED_PING_TIME_SECONDS
570 {
571 duration_ms = (MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000.0) as u64;
572 } else {
573 duration_ms = duration.as_millis() as u64;
574 }
575 let _ = self
576 .file_data_tx
577 .unbounded_send(ReceivedData::ResponseTimeMs(duration_ms as usize));
578 self.measure_ping_time = false;
579 }
580 }
581 let data_size = data.len();
582 let _ = self
583 .file_data_tx
584 .unbounded_send(ReceivedData::Data(PartialFileData {
585 offset: self.data_offset,
586 data: data,
587 }));
588 self.data_offset += data_size;
589 if self.request_length < data_size {
590 warn!("Data receiver for range {} (+{}) received more data from server than requested.", self.initial_data_offset, self.initial_request_length);
591 self.request_length = 0;
592 } else {
593 self.request_length -= data_size;
594 }
595 if self.request_length == 0 {
596 self.finish();
597 return Ok(Async::Ready(()));
598 }
599 }
600 Ok(Async::Ready(None)) => {
601 if self.request_length > 0 {
602 warn!("Data receiver for range {} (+{}) received less data from server than requested.", self.initial_data_offset, self.initial_request_length);
603 }
604 self.finish();
605 return Ok(Async::Ready(()));
606 }
607 Ok(Async::NotReady) => {
608 return Ok(Async::NotReady);
609 }
610 Err(ChannelError) => {
611 warn!(
612 "Error from channel for data receiver for range {} (+{}).",
613 self.initial_data_offset, self.initial_request_length
614 );
615 self.finish();
616 return Ok(Async::Ready(()));
617 }
618 }
619 }
620 }
621 }
622
623 struct AudioFileFetch {
624 session: Session,
625 shared: Arc<AudioFileShared>,
626 output: Option<NamedTempFile>,
627
628 file_data_tx: mpsc::UnboundedSender<ReceivedData>,
629 file_data_rx: mpsc::UnboundedReceiver<ReceivedData>,
630
631 stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>,
632 complete_tx: Option<oneshot::Sender<NamedTempFile>>,
633 network_response_times_ms: Vec<usize>,
634 }
635
636 impl AudioFileFetch {
new( session: Session, shared: Arc<AudioFileShared>, initial_data_rx: ChannelData, initial_request_sent_time: Instant, initial_data_length: usize, output: NamedTempFile, stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>, complete_tx: oneshot::Sender<NamedTempFile>, ) -> AudioFileFetch637 fn new(
638 session: Session,
639 shared: Arc<AudioFileShared>,
640 initial_data_rx: ChannelData,
641 initial_request_sent_time: Instant,
642 initial_data_length: usize,
643
644 output: NamedTempFile,
645 stream_loader_command_rx: mpsc::UnboundedReceiver<StreamLoaderCommand>,
646 complete_tx: oneshot::Sender<NamedTempFile>,
647 ) -> AudioFileFetch {
648 let (file_data_tx, file_data_rx) = unbounded::<ReceivedData>();
649
650 {
651 let requested_range = Range::new(0, initial_data_length);
652 let mut download_status = shared.download_status.lock().unwrap();
653 download_status.requested.add_range(&requested_range);
654 }
655
656 let initial_data_receiver = AudioFileFetchDataReceiver::new(
657 shared.clone(),
658 file_data_tx.clone(),
659 initial_data_rx,
660 0,
661 initial_data_length,
662 initial_request_sent_time,
663 );
664
665 session.spawn(move |_| initial_data_receiver);
666
667 AudioFileFetch {
668 session: session,
669 shared: shared,
670 output: Some(output),
671
672 file_data_tx: file_data_tx,
673 file_data_rx: file_data_rx,
674
675 stream_loader_command_rx: stream_loader_command_rx,
676 complete_tx: Some(complete_tx),
677 network_response_times_ms: Vec::new(),
678 }
679 }
680
get_download_strategy(&mut self) -> DownloadStrategy681 fn get_download_strategy(&mut self) -> DownloadStrategy {
682 *(self.shared.download_strategy.lock().unwrap())
683 }
684
download_range(&mut self, mut offset: usize, mut length: usize)685 fn download_range(&mut self, mut offset: usize, mut length: usize) {
686 if length < MINIMUM_DOWNLOAD_SIZE {
687 length = MINIMUM_DOWNLOAD_SIZE;
688 }
689
690 // ensure the values are within the bounds and align them by 4 for the spotify protocol.
691 if offset >= self.shared.file_size {
692 return;
693 }
694
695 if length <= 0 {
696 return;
697 }
698
699 if offset + length > self.shared.file_size {
700 length = self.shared.file_size - offset;
701 }
702
703 if offset % 4 != 0 {
704 length += offset % 4;
705 offset -= offset % 4;
706 }
707
708 if length % 4 != 0 {
709 length += 4 - (length % 4);
710 }
711
712 let mut ranges_to_request = RangeSet::new();
713 ranges_to_request.add_range(&Range::new(offset, length));
714
715 let mut download_status = self.shared.download_status.lock().unwrap();
716
717 ranges_to_request.subtract_range_set(&download_status.downloaded);
718 ranges_to_request.subtract_range_set(&download_status.requested);
719
720 for range in ranges_to_request.iter() {
721 let (_headers, data) = request_range(
722 &self.session,
723 self.shared.file_id,
724 range.start,
725 range.length,
726 )
727 .split();
728
729 download_status.requested.add_range(range);
730
731 let receiver = AudioFileFetchDataReceiver::new(
732 self.shared.clone(),
733 self.file_data_tx.clone(),
734 data,
735 range.start,
736 range.length,
737 Instant::now(),
738 );
739
740 self.session.spawn(move |_| receiver);
741 }
742 }
743
pre_fetch_more_data(&mut self, bytes: usize, max_requests_to_send: usize)744 fn pre_fetch_more_data(&mut self, bytes: usize, max_requests_to_send: usize) {
745 let mut bytes_to_go = bytes;
746 let mut requests_to_go = max_requests_to_send;
747
748 while bytes_to_go > 0 && requests_to_go > 0 {
749 // determine what is still missing
750 let mut missing_data = RangeSet::new();
751 missing_data.add_range(&Range::new(0, self.shared.file_size));
752 {
753 let download_status = self.shared.download_status.lock().unwrap();
754 missing_data.subtract_range_set(&download_status.downloaded);
755 missing_data.subtract_range_set(&download_status.requested);
756 }
757
758 // download data from after the current read position first
759 let mut tail_end = RangeSet::new();
760 let read_position = self.shared.read_position.load(atomic::Ordering::Relaxed);
761 tail_end.add_range(&Range::new(
762 read_position,
763 self.shared.file_size - read_position,
764 ));
765 let tail_end = tail_end.intersection(&missing_data);
766
767 if !tail_end.is_empty() {
768 let range = tail_end.get_range(0);
769 let offset = range.start;
770 let length = min(range.length, bytes_to_go);
771 self.download_range(offset, length);
772 requests_to_go -= 1;
773 bytes_to_go -= length;
774 } else if !missing_data.is_empty() {
775 // ok, the tail is downloaded, download something fom the beginning.
776 let range = missing_data.get_range(0);
777 let offset = range.start;
778 let length = min(range.length, bytes_to_go);
779 self.download_range(offset, length);
780 requests_to_go -= 1;
781 bytes_to_go -= length;
782 } else {
783 return;
784 }
785 }
786 }
787
poll_file_data_rx(&mut self) -> Poll<(), ()>788 fn poll_file_data_rx(&mut self) -> Poll<(), ()> {
789 loop {
790 match self.file_data_rx.poll() {
791 Ok(Async::Ready(None)) => {
792 return Ok(Async::Ready(()));
793 }
794 Ok(Async::Ready(Some(ReceivedData::ResponseTimeMs(response_time_ms)))) => {
795 trace!("Ping time estimated as: {} ms.", response_time_ms);
796
797 // record the response time
798 self.network_response_times_ms.push(response_time_ms);
799
800 // prune old response times. Keep at most three.
801 while self.network_response_times_ms.len() > 3 {
802 self.network_response_times_ms.remove(0);
803 }
804
805 // stats::median is experimental. So we calculate the median of up to three ourselves.
806 let ping_time_ms: usize = match self.network_response_times_ms.len() {
807 1 => self.network_response_times_ms[0] as usize,
808 2 => {
809 ((self.network_response_times_ms[0]
810 + self.network_response_times_ms[1])
811 / 2) as usize
812 }
813 3 => {
814 let mut times = self.network_response_times_ms.clone();
815 times.sort();
816 times[1]
817 }
818 _ => unreachable!(),
819 };
820
821 // store our new estimate for everyone to see
822 self.shared
823 .ping_time_ms
824 .store(ping_time_ms, atomic::Ordering::Relaxed);
825 }
826 Ok(Async::Ready(Some(ReceivedData::Data(data)))) => {
827 self.output
828 .as_mut()
829 .unwrap()
830 .seek(SeekFrom::Start(data.offset as u64))
831 .unwrap();
832 self.output
833 .as_mut()
834 .unwrap()
835 .write_all(data.data.as_ref())
836 .unwrap();
837
838 let mut full = false;
839
840 {
841 let mut download_status = self.shared.download_status.lock().unwrap();
842
843 let received_range = Range::new(data.offset, data.data.len());
844 download_status.downloaded.add_range(&received_range);
845 self.shared.cond.notify_all();
846
847 if download_status.downloaded.contained_length_from_value(0)
848 >= self.shared.file_size
849 {
850 full = true;
851 }
852
853 drop(download_status);
854 }
855
856 if full {
857 self.finish();
858 return Ok(Async::Ready(()));
859 }
860 }
861 Ok(Async::NotReady) => {
862 return Ok(Async::NotReady);
863 }
864 Err(()) => unreachable!(),
865 }
866 }
867 }
868
poll_stream_loader_command_rx(&mut self) -> Poll<(), ()>869 fn poll_stream_loader_command_rx(&mut self) -> Poll<(), ()> {
870 loop {
871 match self.stream_loader_command_rx.poll() {
872 Ok(Async::Ready(None)) => {
873 return Ok(Async::Ready(()));
874 }
875 Ok(Async::Ready(Some(StreamLoaderCommand::Fetch(request)))) => {
876 self.download_range(request.start, request.length);
877 }
878 Ok(Async::Ready(Some(StreamLoaderCommand::RandomAccessMode()))) => {
879 *(self.shared.download_strategy.lock().unwrap()) =
880 DownloadStrategy::RandomAccess();
881 }
882 Ok(Async::Ready(Some(StreamLoaderCommand::StreamMode()))) => {
883 *(self.shared.download_strategy.lock().unwrap()) =
884 DownloadStrategy::Streaming();
885 }
886 Ok(Async::Ready(Some(StreamLoaderCommand::Close()))) => {
887 return Ok(Async::Ready(()));
888 }
889 Ok(Async::NotReady) => return Ok(Async::NotReady),
890 Err(()) => unreachable!(),
891 }
892 }
893 }
894
finish(&mut self)895 fn finish(&mut self) {
896 let mut output = self.output.take().unwrap();
897 let complete_tx = self.complete_tx.take().unwrap();
898
899 output.seek(SeekFrom::Start(0)).unwrap();
900 let _ = complete_tx.send(output);
901 }
902 }
903
904 impl Future for AudioFileFetch {
905 type Item = ();
906 type Error = ();
907
poll(&mut self) -> Poll<(), ()>908 fn poll(&mut self) -> Poll<(), ()> {
909 match self.poll_stream_loader_command_rx() {
910 Ok(Async::NotReady) => (),
911 Ok(Async::Ready(_)) => {
912 return Ok(Async::Ready(()));
913 }
914 Err(()) => unreachable!(),
915 }
916
917 match self.poll_file_data_rx() {
918 Ok(Async::NotReady) => (),
919 Ok(Async::Ready(_)) => {
920 return Ok(Async::Ready(()));
921 }
922 Err(()) => unreachable!(),
923 }
924
925 if let DownloadStrategy::Streaming() = self.get_download_strategy() {
926 let number_of_open_requests = self
927 .shared
928 .number_of_open_requests
929 .load(atomic::Ordering::SeqCst);
930 let max_requests_to_send =
931 MAX_PREFETCH_REQUESTS - min(MAX_PREFETCH_REQUESTS, number_of_open_requests);
932
933 if max_requests_to_send > 0 {
934 let bytes_pending: usize = {
935 let download_status = self.shared.download_status.lock().unwrap();
936 download_status
937 .requested
938 .minus(&download_status.downloaded)
939 .len()
940 };
941
942 let ping_time_seconds =
943 0.001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64;
944 let download_rate = self.session.channel().get_download_rate_estimate();
945
946 let desired_pending_bytes = max(
947 (PREFETCH_THRESHOLD_FACTOR
948 * ping_time_seconds
949 * self.shared.stream_data_rate as f64) as usize,
950 (FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64)
951 as usize,
952 );
953
954 if bytes_pending < desired_pending_bytes {
955 self.pre_fetch_more_data(
956 desired_pending_bytes - bytes_pending,
957 max_requests_to_send,
958 );
959 }
960 }
961 }
962
963 return Ok(Async::NotReady);
964 }
965 }
966
967 impl Read for AudioFileStreaming {
read(&mut self, output: &mut [u8]) -> io::Result<usize>968 fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
969 let offset = self.position as usize;
970
971 if offset >= self.shared.file_size {
972 return Ok(0);
973 }
974
975 let length = min(output.len(), self.shared.file_size - offset);
976
977 let length_to_request = match *(self.shared.download_strategy.lock().unwrap()) {
978 DownloadStrategy::RandomAccess() => length,
979 DownloadStrategy::Streaming() => {
980 // Due to the read-ahead stuff, we potentially request more than the actual reqeust demanded.
981 let ping_time_seconds =
982 0.0001 * self.shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64;
983
984 let length_to_request = length
985 + max(
986 (READ_AHEAD_DURING_PLAYBACK_SECONDS * self.shared.stream_data_rate as f64)
987 as usize,
988 (READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS
989 * ping_time_seconds
990 * self.shared.stream_data_rate as f64) as usize,
991 );
992 min(length_to_request, self.shared.file_size - offset)
993 }
994 };
995
996 let mut ranges_to_request = RangeSet::new();
997 ranges_to_request.add_range(&Range::new(offset, length_to_request));
998
999 let mut download_status = self.shared.download_status.lock().unwrap();
1000 ranges_to_request.subtract_range_set(&download_status.downloaded);
1001 ranges_to_request.subtract_range_set(&download_status.requested);
1002
1003 for range in ranges_to_request.iter() {
1004 self.stream_loader_command_tx
1005 .unbounded_send(StreamLoaderCommand::Fetch(range.clone()))
1006 .unwrap();
1007 }
1008
1009 if length == 0 {
1010 return Ok(0);
1011 }
1012
1013 let mut download_message_printed = false;
1014 while !download_status.downloaded.contains(offset) {
1015 if let DownloadStrategy::Streaming() = *self.shared.download_strategy.lock().unwrap() {
1016 if !download_message_printed {
1017 debug!("Stream waiting for download of file position {}. Downloaded ranges: {}. Pending ranges: {}", offset, download_status.downloaded, download_status.requested.minus(&download_status.downloaded));
1018 download_message_printed = true;
1019 }
1020 }
1021 download_status = self
1022 .shared
1023 .cond
1024 .wait_timeout(download_status, Duration::from_millis(1000))
1025 .unwrap()
1026 .0;
1027 }
1028 let available_length = download_status
1029 .downloaded
1030 .contained_length_from_value(offset);
1031 assert!(available_length > 0);
1032 drop(download_status);
1033
1034 self.position = self.read_file.seek(SeekFrom::Start(offset as u64)).unwrap();
1035 let read_len = min(length, available_length);
1036 let read_len = self.read_file.read(&mut output[..read_len])?;
1037
1038 if download_message_printed {
1039 debug!(
1040 "Read at postion {} completed. {} bytes returned, {} bytes were requested.",
1041 offset,
1042 read_len,
1043 output.len()
1044 );
1045 }
1046
1047 self.position += read_len as u64;
1048 self.shared
1049 .read_position
1050 .store(self.position as usize, atomic::Ordering::Relaxed);
1051
1052 return Ok(read_len);
1053 }
1054 }
1055
1056 impl Seek for AudioFileStreaming {
seek(&mut self, pos: SeekFrom) -> io::Result<u64>1057 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
1058 self.position = self.read_file.seek(pos)?;
1059 // Do not seek past EOF
1060 self.shared
1061 .read_position
1062 .store(self.position as usize, atomic::Ordering::Relaxed);
1063 Ok(self.position)
1064 }
1065 }
1066
1067 impl Read for AudioFile {
read(&mut self, output: &mut [u8]) -> io::Result<usize>1068 fn read(&mut self, output: &mut [u8]) -> io::Result<usize> {
1069 match *self {
1070 AudioFile::Cached(ref mut file) => file.read(output),
1071 AudioFile::Streaming(ref mut file) => file.read(output),
1072 }
1073 }
1074 }
1075
1076 impl Seek for AudioFile {
seek(&mut self, pos: SeekFrom) -> io::Result<u64>1077 fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
1078 match *self {
1079 AudioFile::Cached(ref mut file) => file.seek(pos),
1080 AudioFile::Streaming(ref mut file) => file.seek(pos),
1081 }
1082 }
1083 }
1084