1 // Ogg decoder and encoder written in Rust
2 //
3 // Copyright (c) 2016-2017 est31 <MTest31@outlook.com>
4 // and contributors. All rights reserved.
5 // Redistribution or use only under the terms
6 // specified in the LICENSE file attached to this
7 // source distribution.
8 
9 /*!
10 Reading logic
11 */
12 
13 use std::error;
14 use std::io;
15 use std::io::{Cursor, Read, Write, SeekFrom, Error, ErrorKind};
16 use byteorder::{ReadBytesExt, LittleEndian};
17 use std::collections::HashMap;
18 use std::collections::hash_map::Entry;
19 use std::fmt::{Display, Formatter, Error as FmtError};
20 use std::mem::replace;
21 use crc::vorbis_crc32_update;
22 use Packet;
23 use std::io::Seek;
24 
25 /// Error that can be raised when decoding an Ogg transport.
26 #[derive(Debug)]
27 pub enum OggReadError {
28 	/// The capture pattern for a new page was not found
29 	/// where one was expected.
30 	NoCapturePatternFound,
31 	/// Invalid stream structure version, with the given one
32 	/// attached.
33 	InvalidStreamStructVer(u8),
34 	/// Mismatch of the hash value with (expected, calculated) value.
35 	HashMismatch(u32, u32),
36 	/// I/O error occured.
37 	ReadError(io::Error),
38 	/// Some constraint required by the spec was not met.
39 	InvalidData,
40 }
41 
42 impl OggReadError {
description_str(&self) -> &str43 	fn description_str(&self) -> &str {
44 		match *self {
45 			OggReadError::NoCapturePatternFound => "No Ogg capture pattern found",
46 			OggReadError::InvalidStreamStructVer(_) =>
47 				"A non zero stream structure version was passed",
48 			OggReadError::HashMismatch(_, _) => "CRC32 hash mismatch",
49 			OggReadError::ReadError(_) => "I/O error",
50 			OggReadError::InvalidData => "Constraint violated",
51 		}
52 	}
53 }
54 
55 impl error::Error for OggReadError {
description(&self) -> &str56 	fn description(&self) -> &str {
57 		self.description_str()
58 	}
59 
cause(&self) -> Option<&dyn error::Error>60 	fn cause(&self) -> Option<&dyn error::Error> {
61 		match *self {
62 			OggReadError::ReadError(ref err) => Some(err as &dyn error::Error),
63 			_ => None
64 		}
65 	}
66 }
67 
68 impl Display for OggReadError {
fmt(&self, fmt :&mut Formatter) -> Result<(), FmtError>69 	fn fmt(&self, fmt :&mut Formatter) -> Result<(), FmtError> {
70 		write!(fmt, "{}", Self::description_str(self))
71 	}
72 }
73 
74 impl From<io::Error> for OggReadError {
from(err :io::Error) -> OggReadError75 	fn from(err :io::Error) -> OggReadError {
76 		return OggReadError::ReadError(err);
77 	}
78 }
79 
80 /// Containing information about an OGG page that is shared between multiple places
81 struct PageBaseInfo {
82 	/// `true`: the first packet is continued from the page before. `false`: if it's a "fresh" one
83 	starts_with_continued :bool,
84 	/// `true` if this page is the first one in the logical bitstream
85 	first_page :bool,
86 	/// `true` if this page is the last one in the logical bitstream
87 	last_page :bool,
88 	/// Absolute granule position. The codec defines further meaning.
89 	absgp :u64,
90 	/// Page counter
91 	sequence_num :u32,
92 	/// Packet information:
93 	/// index is number of packet,
94 	/// tuple is (offset, length) of packet
95 	/// if ends_with_continued is true, the last element will contain information
96 	/// about the continued packet
97 	packet_positions :Vec<(u16,u16)>,
98 	/// `true` if the packet is continued in subsequent page(s)
99 	/// `false` if the packet has a segment of length < 255 inside this page
100 	ends_with_continued :bool,
101 }
102 
103 /// Internal helper struct for PacketReader state
104 struct PageInfo {
105 	/// Basic information about the last read page
106 	bi :PageBaseInfo,
107 	/// The index of the first "unread" packet
108 	packet_idx :u8,
109 	/// Contains the package data
110 	page_body :Vec<u8>,
111 
112 	/// If there is a residue from previous pages in terms of a package spanning multiple
113 	/// pages, this field contains it. Having this Vec<Vec<u8>> and
114 	/// not Vec<u8> ensures to give us O(n) complexity, not O(n^2)
115 	/// for `n` as number of pages that the packet is contained in.
116 	last_overlap_pck :Vec<Vec<u8>>,
117 }
118 
119 impl PageInfo {
120 	/// Returns `true` if the first "unread" packet is the first one
121 	/// in the page, `false` otherwise.
is_first_pck_in_pg(&self) -> bool122 	fn is_first_pck_in_pg(&self) -> bool {
123 		return self.packet_idx == 0;
124 	}
125 	/// Returns `true` if the first "unread" packet is the last one
126 	/// in the page, `false` otherwise.
127 	/// If the first "unread" packet isn't completed in this page
128 	/// (spans page borders), this returns `false`.
is_last_pck_in_pg(&self) -> bool129 	fn is_last_pck_in_pg(&self) -> bool {
130 		return (self.packet_idx + 1 + (self.bi.ends_with_continued as u8)) as usize
131 			== self.bi.packet_positions.len();
132 	}
133 }
134 
135 /// Contains a fully parsed OGG page.
136 pub struct OggPage(PageParser);
137 
138 impl OggPage {
139 	/// Returns whether there is an ending packet in the page
has_packet_end(&self) -> bool140 	fn has_packet_end(&self) -> bool {
141 		(self.0.bi.packet_positions.len() -
142 			self.0.bi.ends_with_continued as usize) > 0
143 	}
144 	/// Returns whether there is a packet that both
145 	/// starts and ends inside the page
has_whole_packet(&self) -> bool146 	fn has_whole_packet(&self) -> bool {
147 		self.0.bi.packet_positions.len().saturating_sub(
148 			self.0.bi.ends_with_continued as usize +
149 			self.0.bi.starts_with_continued as usize) > 0
150 	}
151 	/// Returns whether there is a starting packet in the page
has_packet_start(&self) -> bool152 	fn has_packet_start(&self) -> bool {
153 		(self.0.bi.packet_positions.len() -
154 			self.0.bi.starts_with_continued as usize) > 0
155 	}
156 }
157 
158 /**
159 Helper struct for parsing pages
160 
161 It's created using the `new` function and then it's fed more data via the `parse_segments`
162 and `parse_packet_data` functions, each called exactly once and in that precise order.
163 
164 Then later code uses the `OggPage` returned by the `parse_packet_data` function.
165 */
166 pub struct PageParser {
167 	// Members packet_positions, ends_with_continued and packet_count
168 	// get populated after segments have been parsed
169 	bi :PageBaseInfo,
170 
171 	stream_serial :u32,
172 	checksum :u32,
173 	header_buf: [u8; 27],
174 	/// Number of packet ending segments
175 	packet_count :u16, // Gets populated gafter segments have been parsed
176 	/// after segments have been parsed, this contains the segments buffer,
177 	/// after the packet data have been read, this contains the packets buffer.
178 	segments_or_packets_buf :Vec<u8>,
179 }
180 
181 impl PageParser {
182 	/// Creates a new Page parser
183 	///
184 	/// The `header_buf` param contains the first 27 bytes of a new OGG page.
185 	/// Determining when one begins is your responsibility. Usually they
186 	/// begin directly after the end of a previous OGG page, but
187 	/// after you've performed a seek you might end up within the middle of a page
188 	/// and need to recapture.
189 	///
190 	/// Returns a page parser, and the requested size of the segments array.
191 	/// You should allocate and fill such an array, in order to pass it to the `parse_segments`
192 	/// function.
new(header_buf :[u8; 27]) -> Result<(PageParser, usize), OggReadError>193 	pub fn new(header_buf :[u8; 27]) -> Result<(PageParser, usize), OggReadError> {
194 		let mut header_rdr = Cursor::new(header_buf);
195 		header_rdr.set_position(4);
196 		let stream_structure_version = tri!(header_rdr.read_u8());
197 		if stream_structure_version != 0 {
198 			tri!(Err(OggReadError::InvalidStreamStructVer(stream_structure_version)));
199 		}
200 		let header_type_flag = header_rdr.read_u8().unwrap();
201 		let stream_serial;
202 
203 		Ok((PageParser {
204 			bi : PageBaseInfo {
205 				starts_with_continued : header_type_flag & 0x01u8 != 0,
206 				first_page : header_type_flag & 0x02u8 != 0,
207 				last_page : header_type_flag & 0x04u8 != 0,
208 				absgp : header_rdr.read_u64::<LittleEndian>().unwrap(),
209 				sequence_num : {
210 					stream_serial = header_rdr.read_u32::<LittleEndian>().unwrap();
211 					header_rdr.read_u32::<LittleEndian>().unwrap()
212 				},
213 				packet_positions : Vec::new(),
214 				ends_with_continued : false,
215 			},
216 			stream_serial,
217 			checksum : header_rdr.read_u32::<LittleEndian>().unwrap(),
218 			header_buf,
219 			packet_count : 0,
220 			segments_or_packets_buf :Vec::new(),
221 		},
222 			// Number of page segments
223 			header_rdr.read_u8().unwrap() as usize
224 		))
225 	}
226 
227 	/// Parses the segments buffer, and returns the requested size
228 	/// of the packets content array.
229 	///
230 	/// You should allocate and fill such an array, in order to pass it to the `parse_packet_data`
231 	/// function.
parse_segments(&mut self, segments_buf :Vec<u8>) -> usize232 	pub fn parse_segments(&mut self, segments_buf :Vec<u8>) -> usize {
233 		let mut page_siz :u16 = 0; // Size of the page's body
234 		// Whether our page ends with a continued packet
235 		self.bi.ends_with_continued = self.bi.starts_with_continued;
236 
237 		// First run: get the number of packets,
238 		// whether the page ends with a continued packet,
239 		// and the size of the page's body
240 		for val in &segments_buf {
241 			page_siz += *val as u16;
242 			// Increment by 1 if val < 255, otherwise by 0
243 			self.packet_count += (*val < 255) as u16;
244 			self.bi.ends_with_continued = !(*val < 255);
245 		}
246 
247 		let mut packets = Vec::with_capacity(self.packet_count as usize
248 			+ self.bi.ends_with_continued as usize);
249 		let mut cur_packet_siz :u16 = 0;
250 		let mut cur_packet_offs :u16 = 0;
251 
252 		// Second run: get the offsets of the packets
253 		// Not that we need it right now, but it's much more fun this way, am I right
254 		for val in &segments_buf {
255 			cur_packet_siz += *val as u16;
256 			if *val < 255 {
257 				packets.push((cur_packet_offs, cur_packet_siz));
258 				cur_packet_offs += cur_packet_siz;
259 				cur_packet_siz = 0;
260 			}
261 		}
262 		if self.bi.ends_with_continued {
263 			packets.push((cur_packet_offs, cur_packet_siz));
264 		}
265 
266 		self.bi.packet_positions = packets;
267 		self.segments_or_packets_buf = segments_buf;
268 		page_siz as usize
269 	}
270 
271 	/// Parses the packets data and verifies the checksum.
272 	///
273 	/// Returns an `OggPage` to be used by later code.
parse_packet_data(mut self, packet_data :Vec<u8>) -> Result<OggPage, OggReadError>274 	pub fn parse_packet_data(mut self, packet_data :Vec<u8>) ->
275 			Result<OggPage, OggReadError> {
276 		// Now to hash calculation.
277 		// 1. Clear the header buffer
278 		self.header_buf[22] = 0;
279 		self.header_buf[23] = 0;
280 		self.header_buf[24] = 0;
281 		self.header_buf[25] = 0;
282 
283 		// 2. Calculate the hash
284 		let mut hash_calculated :u32;
285 		hash_calculated = vorbis_crc32_update(0, &self.header_buf);
286 		hash_calculated = vorbis_crc32_update(hash_calculated,
287 			&self.segments_or_packets_buf);
288 		hash_calculated = vorbis_crc32_update(hash_calculated, &packet_data);
289 
290 		// 3. Compare to the extracted one
291 		if self.checksum != hash_calculated {
292 			// Do not verify checksum when the decoder is being fuzzed.
293 			// This allows random input from fuzzers reach decoding code that's actually interesting,
294 			// instead of being rejected early due to checksum mismatch.
295 			if !cfg!(fuzzing) {
296 				tri!(Err(OggReadError::HashMismatch(self.checksum, hash_calculated)));
297 			}
298 		}
299 		self.segments_or_packets_buf = packet_data;
300 		Ok(OggPage(self))
301 	}
302 }
303 
304 /**
305 Low level struct for reading from an Ogg stream.
306 
307 Note that most times you'll want the higher level `PacketReader` struct.
308 
309 It takes care of most of the internal parsing and logic, you
310 will only have to take care of handing over your data.
311 
312 Essentially, it manages a cache of package data for each logical
313 bitstream, and when the cache of every logical bistream is empty,
314 it asks for a fresh page. You will then need to feed the struct
315 one via the `push_page` function.
316 
317 All functions on this struct are async ready.
318 They get their data fed, instead of calling and blocking
319 in order to get it.
320 */
321 pub struct BasePacketReader {
322 	// TODO the hashmap plus the set is perhaps smart ass perfect design but could be made more performant I guess...
323 	// I mean: in > 99% of all cases we'll just have one or two streams.
324 	// AND: their setup changes only very rarely.
325 
326 	/// Contains info about all logical streams that
327 	page_infos :HashMap<u32, PageInfo>,
328 
329 	/// Contains the stream_serial of the stream that contains some unprocessed packet data.
330 	/// There is always <= 1, bc if there is one, no new pages will be read, so there is no chance for a second to be added
331 	/// None if there is no such stream and one has to read a new page.
332 	stream_with_stuff :Option<u32>,
333 
334 	// Bool that is set to true when a seek of the stream has occured.
335 	// This helps validator code to decide whether to accept certain strange data.
336 	has_seeked :bool,
337 }
338 
339 impl BasePacketReader {
340 	/// Constructs a new blank `BasePacketReader`.
341 	///
342 	/// You can feed it data using the `push_page` function, and
343 	/// obtain data using the `read_packet` function.
new() -> Self344 	pub fn new() -> Self {
345 		BasePacketReader { page_infos: HashMap::new(),
346 			stream_with_stuff: None, has_seeked: false }
347 	}
348 	/// Extracts a packet from the cache, if the cache contains valid packet data,
349 	/// otherwise it returns `None`.
350 	///
351 	/// If this function returns `None`, you'll need to add a page to the cache
352 	/// by using the `push_page` function.
read_packet(&mut self) -> Option<Packet>353 	pub fn read_packet(&mut self) -> Option<Packet> {
354 		if self.stream_with_stuff == None {
355 			return None;
356 		}
357 		let str_serial :u32 = self.stream_with_stuff.unwrap();
358 		let pg_info = self.page_infos.get_mut(&str_serial).unwrap();
359 		let (offs, len) = pg_info.bi.packet_positions[pg_info.packet_idx as usize];
360 		// If there is a continued packet, and we are at the start right now,
361 		// and we actually have its end in the current page, glue it together.
362 		let need_to_glue = pg_info.packet_idx == 0 &&
363 				pg_info.bi.starts_with_continued &&
364 				!(pg_info.bi.ends_with_continued && pg_info.bi.packet_positions.len() == 1);
365 		let packet_content :Vec<u8> = if need_to_glue {
366 			// First find out the size of our spanning packet
367 			let mut siz :usize = 0;
368 			for pck in pg_info.last_overlap_pck.iter() {
369 				siz += pck.len();
370 			}
371 			siz += len as usize;
372 			let mut cont :Vec<u8> = Vec::with_capacity(siz);
373 
374 			// Then do the copying
375 			for pck in pg_info.last_overlap_pck.iter() {
376 				cont.write_all(pck).unwrap();
377 			}
378 			// Now reset the overlap container again
379 			pg_info.last_overlap_pck = Vec::new();
380 			cont.write_all(&pg_info.page_body[offs as usize .. (offs + len) as usize]).unwrap();
381 
382 			cont
383 		} else {
384 			let mut cont :Vec<u8> = Vec::with_capacity(len as usize);
385 			// TODO The copy below is totally unneccessary. It is only needed so that we don't have to carry around the old Vec's.
386 			// TODO get something like the shared_slice crate for RefCells, so that we can also have mutable data, shared through
387 			// slices.
388 			let cont_slice :&[u8] = &pg_info.page_body[offs as usize .. (offs + len) as usize];
389 			cont.write_all(cont_slice).unwrap();
390 			cont
391 		};
392 
393 		let first_pck_in_pg = pg_info.is_first_pck_in_pg();
394 		let first_pck_overall = pg_info.bi.first_page && first_pck_in_pg;
395 
396 		let last_pck_in_pg = pg_info.is_last_pck_in_pg();
397 		let last_pck_overall = pg_info.bi.last_page && last_pck_in_pg;
398 
399 		// Update the last read index.
400 		pg_info.packet_idx += 1;
401 		// Set stream_with_stuff to None so that future packet reads
402 		// yield a page read first
403 		if last_pck_in_pg {
404 			self.stream_with_stuff = None;
405 		}
406 
407 		return Some(Packet {
408 			data: packet_content,
409 			first_packet_pg: first_pck_in_pg,
410 			first_packet_stream: first_pck_overall,
411 			last_packet_pg: last_pck_in_pg,
412 			last_packet_stream: last_pck_overall,
413 			absgp_page: pg_info.bi.absgp,
414 			stream_serial: str_serial,
415 		});
416 	}
417 
418 	/// Pushes a given Ogg page, updating the internal structures
419 	/// with its contents.
420 	///
421 	/// If you want the code to function properly, you should first call
422 	/// `parse_segments`, then `parse_packet_data` on a `PageParser`
423 	/// before passing the resulting `OggPage` to this function.
push_page(&mut self, page :OggPage) -> Result<(), OggReadError>424 	pub fn push_page(&mut self, page :OggPage) -> Result<(), OggReadError> {
425 		let mut pg_prs = page.0;
426 		match self.page_infos.entry(pg_prs.stream_serial) {
427 			Entry::Occupied(mut o) => {
428 				let inf = o.get_mut();
429 				if pg_prs.bi.first_page {
430 					tri!(Err(OggReadError::InvalidData));
431 				}
432 				if pg_prs.bi.starts_with_continued != inf.bi.ends_with_continued {
433 					if !self.has_seeked {
434 						tri!(Err(OggReadError::InvalidData));
435 					} else {
436 						// If we have seeked, we are more tolerant here,
437 						// and just drop the continued packet's content.
438 
439 						inf.last_overlap_pck.clear();
440 						if pg_prs.bi.starts_with_continued {
441 							pg_prs.bi.packet_positions.remove(0);
442 							if pg_prs.packet_count != 0 {
443 								// Decrease packet count by one. Normal case.
444 								pg_prs.packet_count -= 1;
445 							} else {
446 								// If the packet count is 0, this means
447 								// that we start and end with the same continued packet.
448 								// So now as we ignore that packet, we must clear the
449 								// ends_with_continued state as well.
450 								pg_prs.bi.ends_with_continued = false;
451 							}
452 						}
453 					}
454 				} else if pg_prs.bi.starts_with_continued {
455 					// Remember the packet at the end so that it can be glued together once
456 					// we encounter the next segment with length < 255 (doesnt have to be in this page)
457 					let (offs, len) = inf.bi.packet_positions[inf.packet_idx as usize];
458 					if len as usize != inf.page_body.len() {
459 						let mut tmp = Vec::with_capacity(len as usize);
460 						tmp.write_all(&inf.page_body[offs as usize .. (offs + len) as usize]).unwrap();
461 						inf.last_overlap_pck.push(tmp);
462 					} else {
463 						// Little optimisation: don't copy if not neccessary
464 						inf.last_overlap_pck.push(replace(&mut inf.page_body, vec![0;0]));
465 					}
466 
467 				}
468 				inf.bi = pg_prs.bi;
469 				inf.packet_idx = 0;
470 				inf.page_body = pg_prs.segments_or_packets_buf;
471 			},
472 			Entry::Vacant(v) => {
473 				if !self.has_seeked {
474 					if !pg_prs.bi.first_page || pg_prs.bi.starts_with_continued {
475 						// If we haven't seeked, this is an error.
476 						tri!(Err(OggReadError::InvalidData));
477 					}
478 				} else {
479 					if !pg_prs.bi.first_page {
480 						// we can just ignore this.
481 					}
482 					if pg_prs.bi.starts_with_continued {
483 						// Ignore the continued packet's content.
484 						// This is a normal occurence if we have just seeked.
485 						pg_prs.bi.packet_positions.remove(0);
486 						if pg_prs.packet_count != 0 {
487 							// Decrease packet count by one. Normal case.
488 							pg_prs.packet_count -= 1;
489 						} else {
490 							// If the packet count is 0, this means
491 							// that we start and end with the same continued packet.
492 							// So now as we ignore that packet, we must clear the
493 							// ends_with_continued state as well.
494 							pg_prs.bi.ends_with_continued = false;
495 						}
496 						// Not actually needed, but good for consistency
497 						pg_prs.bi.starts_with_continued = false;
498 					}
499 				}
500 				v.insert(PageInfo {
501 					bi : pg_prs.bi,
502 					packet_idx: 0,
503 					page_body: pg_prs.segments_or_packets_buf,
504 					last_overlap_pck: Vec::new(),
505 				});
506 			},
507 		}
508 		let pg_has_stuff :bool = pg_prs.packet_count > 0;
509 
510 		if pg_has_stuff {
511 			self.stream_with_stuff = Some(pg_prs.stream_serial);
512 		} else {
513 			self.stream_with_stuff = None;
514 		}
515 
516 		return Ok(());
517 	}
518 
519 	/// Reset the internal state after a seek
520 	///
521 	/// It flushes the cache so that no partial data is left inside.
522 	/// It also tells the parsing logic to expect little inconsistencies
523 	/// due to the read position not being at the start.
update_after_seek(&mut self)524 	pub fn update_after_seek(&mut self) {
525 		self.stream_with_stuff = None;
526 		self.page_infos = HashMap::new();
527 		self.has_seeked = true;
528 	}
529 }
530 
531 #[derive(Clone, Copy)]
532 enum UntilPageHeaderReaderMode {
533 	Searching,
534 	FoundWithNeeded(u8),
535 	SeekNeeded(i32),
536 	Found,
537 }
538 
539 enum UntilPageHeaderResult {
540 	Eof,
541 	Found,
542 	ReadNeeded,
543 	SeekNeeded,
544 }
545 
546 struct UntilPageHeaderReader {
547 	mode :UntilPageHeaderReaderMode,
548 	/// Capture pattern offset. Needed so that if we only partially
549 	/// recognized the capture pattern, we later on only check the
550 	/// remaining part.
551 	cpt_of :u8,
552 	/// The return buffer.
553 	ret_buf :[u8; 27],
554 	read_amount :usize,
555 }
556 
557 impl UntilPageHeaderReader {
new() -> Self558 	pub fn new() -> Self {
559 		UntilPageHeaderReader {
560 			mode : UntilPageHeaderReaderMode::Searching,
561 			cpt_of : 0,
562 			ret_buf : [0; 27],
563 			read_amount : 0,
564 		}
565 	}
566 	/// Returns Some(off), where off is the offset of the last byte
567 	/// of the capture pattern if it's found, None if the capture pattern
568 	/// is not inside the passed slice.
569 	///
570 	/// Changes the capture pattern offset accordingly
check_arr(&mut self, arr :&[u8]) -> Option<usize>571 	fn check_arr(&mut self, arr :&[u8]) -> Option<usize> {
572 		for (i, ch) in arr.iter().enumerate() {
573 			match *ch {
574 				b'O' => self.cpt_of = 1,
575 				b'g' if self.cpt_of == 1 || self.cpt_of == 2 => self.cpt_of += 1,
576 				b'S' if self.cpt_of == 3 => return Some(i),
577 				_ => self.cpt_of = 0,
578 			}
579 		}
580 		return None;
581 	}
582 	/// Do one read exactly, and if it was successful,
583 	/// return Ok(true) if the full header has been read and can be extracted with
584 	///
585 	/// or return Ok(false) if the
do_read<R :Read>(&mut self, mut rdr :R) -> Result<UntilPageHeaderResult, OggReadError>586 	pub fn do_read<R :Read>(&mut self, mut rdr :R)
587 			-> Result<UntilPageHeaderResult, OggReadError> {
588 		use self::UntilPageHeaderReaderMode::*;
589 		use self::UntilPageHeaderResult as Res;
590 		// The array's size is freely choseable, but must be > 27,
591 		// and must well fit into an i32 (needs to be stored in SeekNeeded)
592 		let mut buf :[u8; 1024] = [0; 1024];
593 
594 		let rd_len = tri!(rdr.read(if self.read_amount < 27 {
595 			// This is an optimisation for the most likely case:
596 			// the next page directly follows the current read position.
597 			// Then it would be a waste to read more than the needed amount.
598 			&mut buf[0 .. 27 - self.read_amount]
599 		} else {
600 			match self.mode {
601 				Searching => &mut buf,
602 				FoundWithNeeded(amount) => &mut buf[0 .. amount as usize],
603 				SeekNeeded(_) => return Ok(Res::SeekNeeded),
604 				Found => return Ok(Res::Found),
605 			}
606 		}));
607 
608 		if rd_len == 0 {
609 			// Reached EOF.
610 			if self.read_amount == 0 {
611 				// If we have read nothing yet, there is no data
612 				// but ogg data, meaning the stream ends legally
613 				// and without corruption.
614 				return Ok(Res::Eof);
615 			} else {
616 				// There is most likely a corruption here.
617 				// I'm not sure, but the ogg spec doesn't say that
618 				// random data past the last ogg page is allowed,
619 				// so we just assume it's not allowed.
620 				tri!(Err(OggReadError::NoCapturePatternFound));
621 			}
622 		}
623 		self.read_amount += rd_len;
624 
625 		// 150 kb gives us a bit of safety: we can survive
626 		// up to one page with a corrupted capture pattern
627 		// after having seeked right after a capture pattern
628 		// of an earlier page.
629 		let read_amount_max = 150 * 1024;
630 		if self.read_amount > read_amount_max {
631 			// Exhaustive searching for the capture pattern
632 			// has returned no ogg capture pattern.
633 			tri!(Err(OggReadError::NoCapturePatternFound));
634 		}
635 
636 		let rd_buf = &buf[0 .. rd_len];
637 
638 		use std::cmp::min;
639 		let (off, needed) = match self.mode {
640 			Searching => match self.check_arr(rd_buf) {
641 				// Capture pattern found
642 				Some(off) => {
643 					self.ret_buf[0] = b'O';
644 					self.ret_buf[1] = b'g';
645 					self.ret_buf[2] = b'g';
646 					self.ret_buf[3] = b'S'; // (Not actually needed)
647 					(off, 24)
648 				},
649 				// Nothing found
650 				None => return Ok(Res::ReadNeeded),
651 			},
652 			FoundWithNeeded(needed) => {
653 				(0, needed as usize)
654 			},
655 			_ => unimplemented!(),
656 		};
657 
658 		let fnd_buf = &rd_buf[off..];
659 
660 		let copy_amount = min(needed, fnd_buf.len());
661 		let start_fill = 27 - needed;
662 		(&mut self.ret_buf[start_fill .. copy_amount + start_fill])
663 				.copy_from_slice(&fnd_buf[0 .. copy_amount]);
664 		if fnd_buf.len() == needed {
665 			// Capture pattern found!
666 			self.mode = Found;
667 			return Ok(Res::Found);
668 		} else if fnd_buf.len() < needed {
669 			// We still have to read some content.
670 			let needed_new = needed - copy_amount;
671 			self.mode = FoundWithNeeded(needed_new as u8);
672 			return Ok(Res::ReadNeeded);
673 		} else {
674 			// We have read too much content (exceeding the header).
675 			// Seek back so that we are at the position
676 			// right after the header.
677 
678 			self.mode = SeekNeeded(needed as i32 - fnd_buf.len() as i32);
679 			return Ok(Res::SeekNeeded);
680 		}
681 	}
do_seek<S :Seek>(&mut self, mut skr :S) -> Result<UntilPageHeaderResult, OggReadError>682 	pub fn do_seek<S :Seek>(&mut self, mut skr :S)
683 			-> Result<UntilPageHeaderResult, OggReadError> {
684 		use self::UntilPageHeaderReaderMode::*;
685 		use self::UntilPageHeaderResult as Res;
686 		match self.mode {
687 			Searching | FoundWithNeeded(_) => Ok(Res::ReadNeeded),
688 			SeekNeeded(offs) => {
689 				tri!(skr.seek(SeekFrom::Current(offs as i64)));
690 				self.mode = Found;
691 				Ok(Res::Found)
692 			},
693 			Found => Ok(Res::Found),
694 		}
695 	}
into_header(self) -> [u8; 27]696 	pub fn into_header(self) -> [u8; 27] {
697 		use self::UntilPageHeaderReaderMode::*;
698 		match self.mode {
699 			Found => self.ret_buf,
700 			_ => panic!("wrong mode"),
701 		}
702 	}
703 }
704 
705 /**
706 Reader for packets from an Ogg stream.
707 
708 This reads codec packets belonging to several different logical streams from one physical Ogg container stream.
709 
710 This reader is not async ready. It does not keep its internal state
711 consistent when it encounters the `WouldBlock` error kind.
712 If you desire async functionality, consider enabling the `async` feature
713 and look into the async module.
714 */
715 pub struct PacketReader<T :io::Read + io::Seek> {
716 	rdr :T,
717 
718 	base_pck_rdr :BasePacketReader,
719 }
720 
721 impl<T :io::Read + io::Seek> PacketReader<T> {
722 	/// Constructs a new `PacketReader` with a given `Read`.
new(rdr :T) -> PacketReader<T>723 	pub fn new(rdr :T) -> PacketReader<T> {
724 		PacketReader { rdr, base_pck_rdr : BasePacketReader::new() }
725 	}
726 	/// Returns the wrapped reader, consuming the `PacketReader`.
into_inner(self) -> T727 	pub fn into_inner(self) -> T {
728 		self.rdr
729 	}
730 	/// Reads a packet, and returns it on success.
731 	///
732 	/// Ok(None) is returned if the physical stream has ended.
read_packet(&mut self) -> Result<Option<Packet>, OggReadError>733 	pub fn read_packet(&mut self) -> Result<Option<Packet>, OggReadError> {
734 		// Read pages until we got a valid entire packet
735 		// (packets may span multiple pages, so reading one page
736 		// doesn't always suffice to give us a valid packet)
737 		loop {
738 			if let Some(pck) = self.base_pck_rdr.read_packet() {
739 				return Ok(Some(pck));
740 			}
741 			let page = tri!(self.read_ogg_page());
742 			match page {
743 				Some(page) => tri!(self.base_pck_rdr.push_page(page)),
744 				None => return Ok(None),
745 			}
746 		}
747 	}
748 	/// Reads a packet, and returns it on success.
749 	///
750 	/// The difference to the `read_packet` function is that this function
751 	/// returns an Err(_) if the physical stream has ended.
752 	/// This function is useful if you expect a new packet to come.
read_packet_expected(&mut self) -> Result<Packet, OggReadError>753 	pub fn read_packet_expected(&mut self) -> Result<Packet, OggReadError> {
754 		match tri!(self.read_packet()) {
755 			Some(p) => Ok(p),
756 			None => tri!(Err(Error::new(ErrorKind::UnexpectedEof,
757 				"Expected ogg packet but found end of physical stream"))),
758 		}
759 	}
760 
761 	/// Reads until the new page header, and then returns the page header array.
762 	///
763 	/// If no new page header is immediately found, it performs a "recapture",
764 	/// meaning it searches for the capture pattern, and if it finds it, it
765 	/// reads the complete first 27 bytes of the header, and returns them.
766 	///
767 	/// Ok(None) is returned if the stream has ended without an uncompleted page
768 	/// or non page data after the last page (if any) present.
read_until_pg_header(&mut self) -> Result<Option<[u8; 27]>, OggReadError>769 	fn read_until_pg_header(&mut self) -> Result<Option<[u8; 27]>, OggReadError> {
770 		let mut r = UntilPageHeaderReader::new();
771 		use self::UntilPageHeaderResult::*;
772 		let mut res = tri!(r.do_read(&mut self.rdr));
773 		loop {
774 			res = match res {
775 				Eof => return Ok(None),
776 				Found => break,
777 				ReadNeeded => tri!(r.do_read(&mut self.rdr)),
778 				SeekNeeded => tri!(r.do_seek(&mut self.rdr))
779 			}
780 		}
781 		Ok(Some(r.into_header()))
782 	}
783 
784 	/// Parses and reads a new OGG page
785 	///
786 	/// To support seeking this does not assume that the capture pattern
787 	/// is at the current reader position.
788 	/// Instead it searches until it finds the capture pattern.
read_ogg_page(&mut self) -> Result<Option<OggPage>, OggReadError>789 	fn read_ogg_page(&mut self) -> Result<Option<OggPage>, OggReadError> {
790 		let header_buf :[u8; 27] = match tri!(self.read_until_pg_header()) {
791 			Some(s) => s,
792 			None => return Ok(None)
793 		};
794 		let (mut pg_prs, page_segments) = tri!(PageParser::new(header_buf));
795 
796 		let mut segments_buf = vec![0; page_segments]; // TODO fix this, we initialize memory for NOTHING!!! Out of some reason, this is seen as "unsafe" by rustc.
797 		tri!(self.rdr.read_exact(&mut segments_buf));
798 
799 		let page_siz = pg_prs.parse_segments(segments_buf);
800 
801 		let mut packet_data = vec![0; page_siz as usize];
802 		tri!(self.rdr.read_exact(&mut packet_data));
803 
804 		Ok(Some(tri!(pg_prs.parse_packet_data(packet_data))))
805 	}
806 
807 	/// Seeks the underlying reader
808 	///
809 	/// Seeks the reader that this PacketReader bases on by the specified
810 	/// number of bytes. All new pages will be read from the new position.
811 	///
812 	/// This also flushes all the unread packets in the queue.
seek_bytes(&mut self, pos :SeekFrom) -> Result<u64, Error>813 	pub fn seek_bytes(&mut self, pos :SeekFrom) -> Result<u64, Error> {
814 		let r = tri!(self.rdr.seek(pos));
815 		// Reset the internal state
816 		self.base_pck_rdr.update_after_seek();
817 		return Ok(r);
818 	}
819 
820 	/// Seeks to absolute granule pos
821 	///
822 	/// More specifically, it seeks to the first Ogg page
823 	/// that has an `absgp` greater or equal to the specified one.
824 	/// In the case of continued packets, the seek operation may also end up
825 	/// at the last page that comes before such a page and has a packet start.
826 	///
827 	/// The passed `stream_serial` parameter controls the stream
828 	/// serial number to filter our search for. If it's `None`, no
829 	/// filtering is applied, but if it is `Some(n)`, we filter for
830 	/// streams with the serial number `n`.
831 	/// Note that the `None` case is only intended for streams
832 	/// where only one logical stream exists, the seek may misbehave
833 	/// if `Ǹone` gets passed when multiple streams exist.
834 	///
835 	/// The returned bool indicates whether the seek was successful.
seek_absgp(&mut self, stream_serial :Option<u32>, pos_goal :u64) -> Result<bool, OggReadError>836 	pub fn seek_absgp(&mut self, stream_serial :Option<u32>,
837 			pos_goal :u64) -> Result<bool, OggReadError> {
838 		macro_rules! found {
839 			($pos:expr) => {{
840 				// println!("found: {}", $pos);
841 				tri!(self.rdr.seek(SeekFrom::Start($pos)));
842 				self.base_pck_rdr.update_after_seek();
843 				return Ok(true);
844 			}};
845 		}
846 		macro_rules! bt {
847 			($e:expr) => {{
848 				match tri!($e) {
849 					Some(s) => s,
850 					None => return Ok(false),
851 				}
852 			}};
853 		}
854 		// The task of this macro is to read to the
855 		// end of the logical stream. For optimisation reasons,
856 		// it returns early if we found our goal
857 		// or any page past it.
858 		macro_rules! pg_read_until_end_or_goal {
859 			{$goal:expr} => {{
860 				let mut pos;
861 				let mut pg;
862 				loop {
863 					let (n_pos, n_pg) = pg_read_match_serial!();
864 					pos = n_pos;
865 					pg = n_pg;
866 					// If the absgp matches our goal, the seek process is done.
867 					// This is a nice shortcut as we don't need to perform
868 					// the remainder of the seek process any more.
869 					// Of course, an exact match only happens in the fewest
870 					// of cases
871 					if pg.0.bi.absgp == $goal {
872 						found!(pos);
873 					}
874 					// If we found a page past our goal, we already
875 					// found a position that can serve as end post of the search.
876 					if pg.0.bi.absgp > $goal {
877 						break;
878 					}
879 					// Stop the search if the stream has ended.
880 					if pg.0.bi.last_page {
881 						return Ok(false)
882 					}
883 					// If the page is not interesting, seek over it.
884 				}
885 				(pos, pg)
886 			}};
887 		}
888 		macro_rules! pg_read_match_serial {
889 			{} => {{
890 				let mut pos;
891 				let mut pg;
892 				let mut continued_pck_start = None;
893 				loop {
894 					pos = tri!(self.rdr.seek(SeekFrom::Current(0)));
895 					pg = bt!(self.read_ogg_page());
896 					/*println!("absgp {} serial {} wh {} pe {} @ {}",
897 						pg.0.bi.absgp, pg.0.bi.sequence_num,
898 						pg.has_whole_packet(), pg.has_packet_end(), pos);// */
899 
900 					match stream_serial {
901 						// Continue the search if we encounter a
902 						// page with a different stream serial
903 						Some(s) if pg.0.stream_serial != s => (),
904 						_ => match continued_pck_start {
905 								None if pg.has_whole_packet() => break,
906 								None if pg.has_packet_start() => {
907 									continued_pck_start = Some(pos);
908 								},
909 								Some(s) if pg.has_packet_end() => {
910 									// We have remembered a packet start,
911 									// and have just encountered a packet end.
912 									// Return the position of the start with the
913 									// info from the end (for the absgp).
914 									pos = s;
915 									break;
916 								},
917 								_ => (),
918 						},
919 					}
920 				}
921 				(pos, pg)
922 			}};
923 		}
924 
925 		// Bisect seeking algo.
926 		// Start by finding boundaries, e.g. at the start and
927 		// end of the file, then bisect those boundaries successively
928 		// until a page is found.
929 
930 		//println!("seek start. goal = {}", pos_goal);
931 		let ab_of = |pg :&OggPage| { pg.0.bi.absgp };
932 		let seq_of = |pg :&OggPage| { pg.0.bi.sequence_num };
933 
934 		// First, find initial "boundaries"
935 		// Seek to the start of the file to get the starting boundary
936 		tri!(self.rdr.seek(SeekFrom::Start(0)));
937 		let (mut begin_pos, mut begin_pg) = pg_read_match_serial!();
938 
939 		// If the goal is the beginning, we are done.
940 		if pos_goal == 0 {
941 			//println!("Seeking to the beginning of the stream - skipping bisect.");
942 			found!(begin_pos);
943 		}
944 
945 		// Seek to the end of the file to get the ending boundary
946 		// TODO the 200 KB is just a guessed number, any ideas
947 		// to improve it?
948 		tri!(seek_before_end(&mut self.rdr, 200 * 1024));
949 		let (mut end_pos, mut end_pg) = pg_read_until_end_or_goal!(pos_goal);
950 
951 		// Then perform the bisection
952 		loop {
953 			// Search is done if the two limits are the same page,
954 			// or consecutive pages.
955 			if seq_of(&end_pg) - seq_of(&begin_pg) <= 1 {
956 				found!(end_pos);
957 			}
958 			// Perform the bisection step
959 			let pos_to_seek = begin_pos + (end_pos - begin_pos) / 2;
960 			tri!(self.rdr.seek(SeekFrom::Start(pos_to_seek)));
961 			let (pos, pg) = pg_read_match_serial!();
962 			/*println!("seek {} {} . {} @ {} {} . {}",
963 				ab_of(&begin_pg), ab_of(&end_pg), ab_of(&pg),
964 				begin_pos, end_pos, pos);// */
965 
966 			if seq_of(&end_pg) == seq_of(&pg) ||
967 					seq_of(&begin_pg) == seq_of(&pg) {
968 				//println!("switching to linear.");
969 				// The bisection seek doesn't bring us any further.
970 				// Switch to a linear seek to get the last details.
971 				let mut pos;
972 				let mut pg;
973 				let mut last_packet_end_pos = begin_pos;
974 				tri!(self.rdr.seek(SeekFrom::Start(begin_pos)));
975 				loop {
976 					pos = tri!(self.rdr.seek(SeekFrom::Current(0)));
977 					pg = bt!(self.read_ogg_page());
978 					/*println!("absgp {} pck_start {} whole_pck {} pck_end {} @ {} {}",
979 						ab_of(&pg), pg.has_packet_start(), pg.has_whole_packet(),
980 						pg.has_packet_end(),
981 						pos, last_packet_end_pos);// */
982 					match stream_serial {
983 						// Continue the search if we encounter a
984 						// page with a different stream serial,
985 						// or one with an absgp of -1.
986 						Some(s) if pg.0.stream_serial != s => (),
987 						_ if ab_of(&pg) == -1i64 as u64 => (),
988 						// The page is found if the absgp is >= our goal
989 						_ if ab_of(&pg) >= pos_goal => found!(last_packet_end_pos),
990 						// If we encounter a page with a packet start,
991 						// update accordingly.
992 						_ => if pg.has_packet_end() {
993 							last_packet_end_pos = pos;
994 						},
995 					}
996 				}
997 			}
998 			if ab_of(&pg) >= pos_goal {
999 				end_pos = pos;
1000 				end_pg = pg;
1001 			} else {
1002 				begin_pos = pos;
1003 				begin_pg = pg;
1004 			}
1005 		}
1006 	}
1007 	/// Resets the internal state by deleting all
1008 	/// unread packets.
delete_unread_packets(&mut self)1009 	pub fn delete_unread_packets(&mut self) {
1010 		self.base_pck_rdr.update_after_seek();
1011 	}
1012 }
1013 
1014 // util function
seek_before_end<T :io::Read + io::Seek>(mut rdr :T, offs :u64) -> Result<u64, OggReadError>1015 fn seek_before_end<T :io::Read + io::Seek>(mut rdr :T,
1016 		offs :u64) -> Result<u64, OggReadError> {
1017 	let end_pos = tri!(rdr.seek(SeekFrom::End(0)));
1018 	let end_pos_to_seek = ::std::cmp::min(end_pos, offs);
1019 	return Ok(tri!(rdr.seek(SeekFrom::End(-(end_pos_to_seek as i64)))));
1020 }
1021 
1022 #[cfg(feature = "async")]
1023 /**
1024 Asyncronous ogg decoding
1025 */
1026 pub mod async_api {
1027 	#![allow(deprecated)]
1028 
1029 	use super::*;
1030 	use tokio_io::AsyncRead;
1031 	use tokio_io::codec::{Decoder, FramedRead};
1032 	use futures::stream::Stream;
1033 	use futures::{Async, Poll};
1034 	use bytes::BytesMut;
1035 
1036 	enum PageDecodeState {
1037 		Head,
1038 		Segments(PageParser, usize),
1039 		PacketData(PageParser, usize),
1040 		InUpdate,
1041 	}
1042 
1043 	impl PageDecodeState {
needed_size(&self) -> usize1044 		fn needed_size(&self) -> usize {
1045 			match self {
1046 				&PageDecodeState::Head => 27,
1047 				&PageDecodeState::Segments(_, s) => s,
1048 				&PageDecodeState::PacketData(_, s) => s,
1049 				&PageDecodeState::InUpdate => panic!("invalid state"),
1050 			}
1051 		}
1052 	}
1053 
1054 	/**
1055 	Async page reading functionality.
1056 	*/
1057 	struct PageDecoder {
1058 		state : PageDecodeState,
1059 	}
1060 
1061 	impl PageDecoder {
new() -> Self1062 		fn new() -> Self {
1063 			PageDecoder {
1064 				state : PageDecodeState::Head,
1065 			}
1066 		}
1067 	}
1068 
1069 	impl Decoder for PageDecoder {
1070 		type Item = OggPage;
1071 		type Error = OggReadError;
1072 
decode(&mut self, buf :&mut BytesMut) -> Result<Option<OggPage>, OggReadError>1073 		fn decode(&mut self, buf :&mut BytesMut) ->
1074 				Result<Option<OggPage>, OggReadError> {
1075 			use self::PageDecodeState::*;
1076 			loop {
1077 				let needed_size = self.state.needed_size();
1078 				if buf.len() < needed_size {
1079 					return Ok(None);
1080 				}
1081 				let mut ret = None;
1082 				let consumed_buf = buf.split_to(needed_size).to_vec();
1083 
1084 				self.state = match ::std::mem::replace(&mut self.state, InUpdate) {
1085 					Head => {
1086 						let mut hdr_buf = [0; 27];
1087 						// TODO once we have const generics, the copy below can be done
1088 						// much nicer, maybe with a new into_array fn on Vec's
1089 						hdr_buf.copy_from_slice(&consumed_buf);
1090 						let tup = tri!(PageParser::new(hdr_buf));
1091 						Segments(tup.0, tup.1)
1092 					},
1093 					Segments(mut pg_prs, _) => {
1094 						let new_needed_len = pg_prs.parse_segments(consumed_buf);
1095 						PacketData(pg_prs, new_needed_len)
1096 					},
1097 					PacketData(pg_prs, _) => {
1098 						ret = Some(tri!(pg_prs.parse_packet_data(consumed_buf)));
1099 						Head
1100 					},
1101 					InUpdate => panic!("invalid state"),
1102 				};
1103 				if ret.is_some() {
1104 					return Ok(ret);
1105 				}
1106 			}
1107 		}
1108 
decode_eof(&mut self, buf :&mut BytesMut) -> Result<Option<OggPage>, OggReadError>1109 		fn decode_eof(&mut self, buf :&mut BytesMut) ->
1110 				Result<Option<OggPage>, OggReadError> {
1111 			// Ugly hack for "bytes remaining on stream" error
1112 			return self.decode(buf);
1113 		}
1114 	}
1115 
1116 	/**
1117 	Async packet reading functionality.
1118 	*/
1119 	pub struct PacketReader<T> where T :AsyncRead {
1120 		base_pck_rdr :BasePacketReader,
1121 		pg_rd :FramedRead<T, PageDecoder>,
1122 	}
1123 
1124 	impl<T :AsyncRead> PacketReader<T> {
new(inner :T) -> Self1125 		pub fn new(inner :T) -> Self {
1126 			PacketReader {
1127 				base_pck_rdr : BasePacketReader::new(),
1128 				pg_rd : FramedRead::new(inner, PageDecoder::new()),
1129 			}
1130 		}
1131 	}
1132 
1133 	impl<T :AsyncRead> Stream for PacketReader<T> {
1134 		type Item = Packet;
1135 		type Error = OggReadError;
1136 
poll(&mut self) -> Poll<Option<Packet>, OggReadError>1137 		fn poll(&mut self) -> Poll<Option<Packet>, OggReadError> {
1138 			// Read pages until we got a valid entire packet
1139 			// (packets may span multiple pages, so reading one page
1140 			// doesn't always suffice to give us a valid packet)
1141 			loop {
1142 				if let Some(pck) = self.base_pck_rdr.read_packet() {
1143 					return Ok(Async::Ready(Some(pck)));
1144 				}
1145 				let page = try_ready!(self.pg_rd.poll());
1146 				match page {
1147 					Some(page) => tri!(self.base_pck_rdr.push_page(page)),
1148 					None => return Ok(Async::Ready(None)),
1149 				}
1150 			}
1151 		}
1152 	}
1153 
1154 }
1155