1 // Ogg decoder and encoder written in Rust
2 //
3 // Copyright (c) 2016-2017 est31 <MTest31@outlook.com>
4 // and contributors. All rights reserved.
5 // Redistribution or use only under the terms
6 // specified in the LICENSE file attached to this
7 // source distribution.
8
9 /*!
10 Reading logic
11 */
12
13 use std::error;
14 use std::io;
15 use std::io::{Cursor, Read, Write, SeekFrom, Error, ErrorKind};
16 use byteorder::{ReadBytesExt, LittleEndian};
17 use std::collections::HashMap;
18 use std::collections::hash_map::Entry;
19 use std::fmt::{Display, Formatter, Error as FmtError};
20 use std::mem::replace;
21 use crc::vorbis_crc32_update;
22 use Packet;
23 use std::io::Seek;
24
25 /// Error that can be raised when decoding an Ogg transport.
26 #[derive(Debug)]
27 pub enum OggReadError {
28 /// The capture pattern for a new page was not found
29 /// where one was expected.
30 NoCapturePatternFound,
31 /// Invalid stream structure version, with the given one
32 /// attached.
33 InvalidStreamStructVer(u8),
34 /// Mismatch of the hash value with (expected, calculated) value.
35 HashMismatch(u32, u32),
36 /// I/O error occured.
37 ReadError(io::Error),
38 /// Some constraint required by the spec was not met.
39 InvalidData,
40 }
41
42 impl OggReadError {
description_str(&self) -> &str43 fn description_str(&self) -> &str {
44 match *self {
45 OggReadError::NoCapturePatternFound => "No Ogg capture pattern found",
46 OggReadError::InvalidStreamStructVer(_) =>
47 "A non zero stream structure version was passed",
48 OggReadError::HashMismatch(_, _) => "CRC32 hash mismatch",
49 OggReadError::ReadError(_) => "I/O error",
50 OggReadError::InvalidData => "Constraint violated",
51 }
52 }
53 }
54
55 impl error::Error for OggReadError {
description(&self) -> &str56 fn description(&self) -> &str {
57 self.description_str()
58 }
59
cause(&self) -> Option<&dyn error::Error>60 fn cause(&self) -> Option<&dyn error::Error> {
61 match *self {
62 OggReadError::ReadError(ref err) => Some(err as &dyn error::Error),
63 _ => None
64 }
65 }
66 }
67
68 impl Display for OggReadError {
fmt(&self, fmt :&mut Formatter) -> Result<(), FmtError>69 fn fmt(&self, fmt :&mut Formatter) -> Result<(), FmtError> {
70 write!(fmt, "{}", Self::description_str(self))
71 }
72 }
73
74 impl From<io::Error> for OggReadError {
from(err :io::Error) -> OggReadError75 fn from(err :io::Error) -> OggReadError {
76 return OggReadError::ReadError(err);
77 }
78 }
79
80 /// Containing information about an OGG page that is shared between multiple places
81 struct PageBaseInfo {
82 /// `true`: the first packet is continued from the page before. `false`: if it's a "fresh" one
83 starts_with_continued :bool,
84 /// `true` if this page is the first one in the logical bitstream
85 first_page :bool,
86 /// `true` if this page is the last one in the logical bitstream
87 last_page :bool,
88 /// Absolute granule position. The codec defines further meaning.
89 absgp :u64,
90 /// Page counter
91 sequence_num :u32,
92 /// Packet information:
93 /// index is number of packet,
94 /// tuple is (offset, length) of packet
95 /// if ends_with_continued is true, the last element will contain information
96 /// about the continued packet
97 packet_positions :Vec<(u16,u16)>,
98 /// `true` if the packet is continued in subsequent page(s)
99 /// `false` if the packet has a segment of length < 255 inside this page
100 ends_with_continued :bool,
101 }
102
103 /// Internal helper struct for PacketReader state
104 struct PageInfo {
105 /// Basic information about the last read page
106 bi :PageBaseInfo,
107 /// The index of the first "unread" packet
108 packet_idx :u8,
109 /// Contains the package data
110 page_body :Vec<u8>,
111
112 /// If there is a residue from previous pages in terms of a package spanning multiple
113 /// pages, this field contains it. Having this Vec<Vec<u8>> and
114 /// not Vec<u8> ensures to give us O(n) complexity, not O(n^2)
115 /// for `n` as number of pages that the packet is contained in.
116 last_overlap_pck :Vec<Vec<u8>>,
117 }
118
119 impl PageInfo {
120 /// Returns `true` if the first "unread" packet is the first one
121 /// in the page, `false` otherwise.
is_first_pck_in_pg(&self) -> bool122 fn is_first_pck_in_pg(&self) -> bool {
123 return self.packet_idx == 0;
124 }
125 /// Returns `true` if the first "unread" packet is the last one
126 /// in the page, `false` otherwise.
127 /// If the first "unread" packet isn't completed in this page
128 /// (spans page borders), this returns `false`.
is_last_pck_in_pg(&self) -> bool129 fn is_last_pck_in_pg(&self) -> bool {
130 return (self.packet_idx + 1 + (self.bi.ends_with_continued as u8)) as usize
131 == self.bi.packet_positions.len();
132 }
133 }
134
135 /// Contains a fully parsed OGG page.
136 pub struct OggPage(PageParser);
137
138 impl OggPage {
139 /// Returns whether there is an ending packet in the page
has_packet_end(&self) -> bool140 fn has_packet_end(&self) -> bool {
141 (self.0.bi.packet_positions.len() -
142 self.0.bi.ends_with_continued as usize) > 0
143 }
144 /// Returns whether there is a packet that both
145 /// starts and ends inside the page
has_whole_packet(&self) -> bool146 fn has_whole_packet(&self) -> bool {
147 self.0.bi.packet_positions.len().saturating_sub(
148 self.0.bi.ends_with_continued as usize +
149 self.0.bi.starts_with_continued as usize) > 0
150 }
151 /// Returns whether there is a starting packet in the page
has_packet_start(&self) -> bool152 fn has_packet_start(&self) -> bool {
153 (self.0.bi.packet_positions.len() -
154 self.0.bi.starts_with_continued as usize) > 0
155 }
156 }
157
158 /**
159 Helper struct for parsing pages
160
161 It's created using the `new` function and then it's fed more data via the `parse_segments`
162 and `parse_packet_data` functions, each called exactly once and in that precise order.
163
164 Then later code uses the `OggPage` returned by the `parse_packet_data` function.
165 */
166 pub struct PageParser {
167 // Members packet_positions, ends_with_continued and packet_count
168 // get populated after segments have been parsed
169 bi :PageBaseInfo,
170
171 stream_serial :u32,
172 checksum :u32,
173 header_buf: [u8; 27],
174 /// Number of packet ending segments
175 packet_count :u16, // Gets populated gafter segments have been parsed
176 /// after segments have been parsed, this contains the segments buffer,
177 /// after the packet data have been read, this contains the packets buffer.
178 segments_or_packets_buf :Vec<u8>,
179 }
180
181 impl PageParser {
182 /// Creates a new Page parser
183 ///
184 /// The `header_buf` param contains the first 27 bytes of a new OGG page.
185 /// Determining when one begins is your responsibility. Usually they
186 /// begin directly after the end of a previous OGG page, but
187 /// after you've performed a seek you might end up within the middle of a page
188 /// and need to recapture.
189 ///
190 /// Returns a page parser, and the requested size of the segments array.
191 /// You should allocate and fill such an array, in order to pass it to the `parse_segments`
192 /// function.
new(header_buf :[u8; 27]) -> Result<(PageParser, usize), OggReadError>193 pub fn new(header_buf :[u8; 27]) -> Result<(PageParser, usize), OggReadError> {
194 let mut header_rdr = Cursor::new(header_buf);
195 header_rdr.set_position(4);
196 let stream_structure_version = tri!(header_rdr.read_u8());
197 if stream_structure_version != 0 {
198 tri!(Err(OggReadError::InvalidStreamStructVer(stream_structure_version)));
199 }
200 let header_type_flag = header_rdr.read_u8().unwrap();
201 let stream_serial;
202
203 Ok((PageParser {
204 bi : PageBaseInfo {
205 starts_with_continued : header_type_flag & 0x01u8 != 0,
206 first_page : header_type_flag & 0x02u8 != 0,
207 last_page : header_type_flag & 0x04u8 != 0,
208 absgp : header_rdr.read_u64::<LittleEndian>().unwrap(),
209 sequence_num : {
210 stream_serial = header_rdr.read_u32::<LittleEndian>().unwrap();
211 header_rdr.read_u32::<LittleEndian>().unwrap()
212 },
213 packet_positions : Vec::new(),
214 ends_with_continued : false,
215 },
216 stream_serial,
217 checksum : header_rdr.read_u32::<LittleEndian>().unwrap(),
218 header_buf,
219 packet_count : 0,
220 segments_or_packets_buf :Vec::new(),
221 },
222 // Number of page segments
223 header_rdr.read_u8().unwrap() as usize
224 ))
225 }
226
227 /// Parses the segments buffer, and returns the requested size
228 /// of the packets content array.
229 ///
230 /// You should allocate and fill such an array, in order to pass it to the `parse_packet_data`
231 /// function.
parse_segments(&mut self, segments_buf :Vec<u8>) -> usize232 pub fn parse_segments(&mut self, segments_buf :Vec<u8>) -> usize {
233 let mut page_siz :u16 = 0; // Size of the page's body
234 // Whether our page ends with a continued packet
235 self.bi.ends_with_continued = self.bi.starts_with_continued;
236
237 // First run: get the number of packets,
238 // whether the page ends with a continued packet,
239 // and the size of the page's body
240 for val in &segments_buf {
241 page_siz += *val as u16;
242 // Increment by 1 if val < 255, otherwise by 0
243 self.packet_count += (*val < 255) as u16;
244 self.bi.ends_with_continued = !(*val < 255);
245 }
246
247 let mut packets = Vec::with_capacity(self.packet_count as usize
248 + self.bi.ends_with_continued as usize);
249 let mut cur_packet_siz :u16 = 0;
250 let mut cur_packet_offs :u16 = 0;
251
252 // Second run: get the offsets of the packets
253 // Not that we need it right now, but it's much more fun this way, am I right
254 for val in &segments_buf {
255 cur_packet_siz += *val as u16;
256 if *val < 255 {
257 packets.push((cur_packet_offs, cur_packet_siz));
258 cur_packet_offs += cur_packet_siz;
259 cur_packet_siz = 0;
260 }
261 }
262 if self.bi.ends_with_continued {
263 packets.push((cur_packet_offs, cur_packet_siz));
264 }
265
266 self.bi.packet_positions = packets;
267 self.segments_or_packets_buf = segments_buf;
268 page_siz as usize
269 }
270
271 /// Parses the packets data and verifies the checksum.
272 ///
273 /// Returns an `OggPage` to be used by later code.
parse_packet_data(mut self, packet_data :Vec<u8>) -> Result<OggPage, OggReadError>274 pub fn parse_packet_data(mut self, packet_data :Vec<u8>) ->
275 Result<OggPage, OggReadError> {
276 // Now to hash calculation.
277 // 1. Clear the header buffer
278 self.header_buf[22] = 0;
279 self.header_buf[23] = 0;
280 self.header_buf[24] = 0;
281 self.header_buf[25] = 0;
282
283 // 2. Calculate the hash
284 let mut hash_calculated :u32;
285 hash_calculated = vorbis_crc32_update(0, &self.header_buf);
286 hash_calculated = vorbis_crc32_update(hash_calculated,
287 &self.segments_or_packets_buf);
288 hash_calculated = vorbis_crc32_update(hash_calculated, &packet_data);
289
290 // 3. Compare to the extracted one
291 if self.checksum != hash_calculated {
292 // Do not verify checksum when the decoder is being fuzzed.
293 // This allows random input from fuzzers reach decoding code that's actually interesting,
294 // instead of being rejected early due to checksum mismatch.
295 if !cfg!(fuzzing) {
296 tri!(Err(OggReadError::HashMismatch(self.checksum, hash_calculated)));
297 }
298 }
299 self.segments_or_packets_buf = packet_data;
300 Ok(OggPage(self))
301 }
302 }
303
304 /**
305 Low level struct for reading from an Ogg stream.
306
307 Note that most times you'll want the higher level `PacketReader` struct.
308
309 It takes care of most of the internal parsing and logic, you
310 will only have to take care of handing over your data.
311
312 Essentially, it manages a cache of package data for each logical
313 bitstream, and when the cache of every logical bistream is empty,
314 it asks for a fresh page. You will then need to feed the struct
315 one via the `push_page` function.
316
317 All functions on this struct are async ready.
318 They get their data fed, instead of calling and blocking
319 in order to get it.
320 */
321 pub struct BasePacketReader {
322 // TODO the hashmap plus the set is perhaps smart ass perfect design but could be made more performant I guess...
323 // I mean: in > 99% of all cases we'll just have one or two streams.
324 // AND: their setup changes only very rarely.
325
326 /// Contains info about all logical streams that
327 page_infos :HashMap<u32, PageInfo>,
328
329 /// Contains the stream_serial of the stream that contains some unprocessed packet data.
330 /// There is always <= 1, bc if there is one, no new pages will be read, so there is no chance for a second to be added
331 /// None if there is no such stream and one has to read a new page.
332 stream_with_stuff :Option<u32>,
333
334 // Bool that is set to true when a seek of the stream has occured.
335 // This helps validator code to decide whether to accept certain strange data.
336 has_seeked :bool,
337 }
338
339 impl BasePacketReader {
340 /// Constructs a new blank `BasePacketReader`.
341 ///
342 /// You can feed it data using the `push_page` function, and
343 /// obtain data using the `read_packet` function.
new() -> Self344 pub fn new() -> Self {
345 BasePacketReader { page_infos: HashMap::new(),
346 stream_with_stuff: None, has_seeked: false }
347 }
348 /// Extracts a packet from the cache, if the cache contains valid packet data,
349 /// otherwise it returns `None`.
350 ///
351 /// If this function returns `None`, you'll need to add a page to the cache
352 /// by using the `push_page` function.
read_packet(&mut self) -> Option<Packet>353 pub fn read_packet(&mut self) -> Option<Packet> {
354 if self.stream_with_stuff == None {
355 return None;
356 }
357 let str_serial :u32 = self.stream_with_stuff.unwrap();
358 let pg_info = self.page_infos.get_mut(&str_serial).unwrap();
359 let (offs, len) = pg_info.bi.packet_positions[pg_info.packet_idx as usize];
360 // If there is a continued packet, and we are at the start right now,
361 // and we actually have its end in the current page, glue it together.
362 let need_to_glue = pg_info.packet_idx == 0 &&
363 pg_info.bi.starts_with_continued &&
364 !(pg_info.bi.ends_with_continued && pg_info.bi.packet_positions.len() == 1);
365 let packet_content :Vec<u8> = if need_to_glue {
366 // First find out the size of our spanning packet
367 let mut siz :usize = 0;
368 for pck in pg_info.last_overlap_pck.iter() {
369 siz += pck.len();
370 }
371 siz += len as usize;
372 let mut cont :Vec<u8> = Vec::with_capacity(siz);
373
374 // Then do the copying
375 for pck in pg_info.last_overlap_pck.iter() {
376 cont.write_all(pck).unwrap();
377 }
378 // Now reset the overlap container again
379 pg_info.last_overlap_pck = Vec::new();
380 cont.write_all(&pg_info.page_body[offs as usize .. (offs + len) as usize]).unwrap();
381
382 cont
383 } else {
384 let mut cont :Vec<u8> = Vec::with_capacity(len as usize);
385 // TODO The copy below is totally unneccessary. It is only needed so that we don't have to carry around the old Vec's.
386 // TODO get something like the shared_slice crate for RefCells, so that we can also have mutable data, shared through
387 // slices.
388 let cont_slice :&[u8] = &pg_info.page_body[offs as usize .. (offs + len) as usize];
389 cont.write_all(cont_slice).unwrap();
390 cont
391 };
392
393 let first_pck_in_pg = pg_info.is_first_pck_in_pg();
394 let first_pck_overall = pg_info.bi.first_page && first_pck_in_pg;
395
396 let last_pck_in_pg = pg_info.is_last_pck_in_pg();
397 let last_pck_overall = pg_info.bi.last_page && last_pck_in_pg;
398
399 // Update the last read index.
400 pg_info.packet_idx += 1;
401 // Set stream_with_stuff to None so that future packet reads
402 // yield a page read first
403 if last_pck_in_pg {
404 self.stream_with_stuff = None;
405 }
406
407 return Some(Packet {
408 data: packet_content,
409 first_packet_pg: first_pck_in_pg,
410 first_packet_stream: first_pck_overall,
411 last_packet_pg: last_pck_in_pg,
412 last_packet_stream: last_pck_overall,
413 absgp_page: pg_info.bi.absgp,
414 stream_serial: str_serial,
415 });
416 }
417
418 /// Pushes a given Ogg page, updating the internal structures
419 /// with its contents.
420 ///
421 /// If you want the code to function properly, you should first call
422 /// `parse_segments`, then `parse_packet_data` on a `PageParser`
423 /// before passing the resulting `OggPage` to this function.
push_page(&mut self, page :OggPage) -> Result<(), OggReadError>424 pub fn push_page(&mut self, page :OggPage) -> Result<(), OggReadError> {
425 let mut pg_prs = page.0;
426 match self.page_infos.entry(pg_prs.stream_serial) {
427 Entry::Occupied(mut o) => {
428 let inf = o.get_mut();
429 if pg_prs.bi.first_page {
430 tri!(Err(OggReadError::InvalidData));
431 }
432 if pg_prs.bi.starts_with_continued != inf.bi.ends_with_continued {
433 if !self.has_seeked {
434 tri!(Err(OggReadError::InvalidData));
435 } else {
436 // If we have seeked, we are more tolerant here,
437 // and just drop the continued packet's content.
438
439 inf.last_overlap_pck.clear();
440 if pg_prs.bi.starts_with_continued {
441 pg_prs.bi.packet_positions.remove(0);
442 if pg_prs.packet_count != 0 {
443 // Decrease packet count by one. Normal case.
444 pg_prs.packet_count -= 1;
445 } else {
446 // If the packet count is 0, this means
447 // that we start and end with the same continued packet.
448 // So now as we ignore that packet, we must clear the
449 // ends_with_continued state as well.
450 pg_prs.bi.ends_with_continued = false;
451 }
452 }
453 }
454 } else if pg_prs.bi.starts_with_continued {
455 // Remember the packet at the end so that it can be glued together once
456 // we encounter the next segment with length < 255 (doesnt have to be in this page)
457 let (offs, len) = inf.bi.packet_positions[inf.packet_idx as usize];
458 if len as usize != inf.page_body.len() {
459 let mut tmp = Vec::with_capacity(len as usize);
460 tmp.write_all(&inf.page_body[offs as usize .. (offs + len) as usize]).unwrap();
461 inf.last_overlap_pck.push(tmp);
462 } else {
463 // Little optimisation: don't copy if not neccessary
464 inf.last_overlap_pck.push(replace(&mut inf.page_body, vec![0;0]));
465 }
466
467 }
468 inf.bi = pg_prs.bi;
469 inf.packet_idx = 0;
470 inf.page_body = pg_prs.segments_or_packets_buf;
471 },
472 Entry::Vacant(v) => {
473 if !self.has_seeked {
474 if !pg_prs.bi.first_page || pg_prs.bi.starts_with_continued {
475 // If we haven't seeked, this is an error.
476 tri!(Err(OggReadError::InvalidData));
477 }
478 } else {
479 if !pg_prs.bi.first_page {
480 // we can just ignore this.
481 }
482 if pg_prs.bi.starts_with_continued {
483 // Ignore the continued packet's content.
484 // This is a normal occurence if we have just seeked.
485 pg_prs.bi.packet_positions.remove(0);
486 if pg_prs.packet_count != 0 {
487 // Decrease packet count by one. Normal case.
488 pg_prs.packet_count -= 1;
489 } else {
490 // If the packet count is 0, this means
491 // that we start and end with the same continued packet.
492 // So now as we ignore that packet, we must clear the
493 // ends_with_continued state as well.
494 pg_prs.bi.ends_with_continued = false;
495 }
496 // Not actually needed, but good for consistency
497 pg_prs.bi.starts_with_continued = false;
498 }
499 }
500 v.insert(PageInfo {
501 bi : pg_prs.bi,
502 packet_idx: 0,
503 page_body: pg_prs.segments_or_packets_buf,
504 last_overlap_pck: Vec::new(),
505 });
506 },
507 }
508 let pg_has_stuff :bool = pg_prs.packet_count > 0;
509
510 if pg_has_stuff {
511 self.stream_with_stuff = Some(pg_prs.stream_serial);
512 } else {
513 self.stream_with_stuff = None;
514 }
515
516 return Ok(());
517 }
518
519 /// Reset the internal state after a seek
520 ///
521 /// It flushes the cache so that no partial data is left inside.
522 /// It also tells the parsing logic to expect little inconsistencies
523 /// due to the read position not being at the start.
update_after_seek(&mut self)524 pub fn update_after_seek(&mut self) {
525 self.stream_with_stuff = None;
526 self.page_infos = HashMap::new();
527 self.has_seeked = true;
528 }
529 }
530
531 #[derive(Clone, Copy)]
532 enum UntilPageHeaderReaderMode {
533 Searching,
534 FoundWithNeeded(u8),
535 SeekNeeded(i32),
536 Found,
537 }
538
539 enum UntilPageHeaderResult {
540 Eof,
541 Found,
542 ReadNeeded,
543 SeekNeeded,
544 }
545
546 struct UntilPageHeaderReader {
547 mode :UntilPageHeaderReaderMode,
548 /// Capture pattern offset. Needed so that if we only partially
549 /// recognized the capture pattern, we later on only check the
550 /// remaining part.
551 cpt_of :u8,
552 /// The return buffer.
553 ret_buf :[u8; 27],
554 read_amount :usize,
555 }
556
557 impl UntilPageHeaderReader {
new() -> Self558 pub fn new() -> Self {
559 UntilPageHeaderReader {
560 mode : UntilPageHeaderReaderMode::Searching,
561 cpt_of : 0,
562 ret_buf : [0; 27],
563 read_amount : 0,
564 }
565 }
566 /// Returns Some(off), where off is the offset of the last byte
567 /// of the capture pattern if it's found, None if the capture pattern
568 /// is not inside the passed slice.
569 ///
570 /// Changes the capture pattern offset accordingly
check_arr(&mut self, arr :&[u8]) -> Option<usize>571 fn check_arr(&mut self, arr :&[u8]) -> Option<usize> {
572 for (i, ch) in arr.iter().enumerate() {
573 match *ch {
574 b'O' => self.cpt_of = 1,
575 b'g' if self.cpt_of == 1 || self.cpt_of == 2 => self.cpt_of += 1,
576 b'S' if self.cpt_of == 3 => return Some(i),
577 _ => self.cpt_of = 0,
578 }
579 }
580 return None;
581 }
582 /// Do one read exactly, and if it was successful,
583 /// return Ok(true) if the full header has been read and can be extracted with
584 ///
585 /// or return Ok(false) if the
do_read<R :Read>(&mut self, mut rdr :R) -> Result<UntilPageHeaderResult, OggReadError>586 pub fn do_read<R :Read>(&mut self, mut rdr :R)
587 -> Result<UntilPageHeaderResult, OggReadError> {
588 use self::UntilPageHeaderReaderMode::*;
589 use self::UntilPageHeaderResult as Res;
590 // The array's size is freely choseable, but must be > 27,
591 // and must well fit into an i32 (needs to be stored in SeekNeeded)
592 let mut buf :[u8; 1024] = [0; 1024];
593
594 let rd_len = tri!(rdr.read(if self.read_amount < 27 {
595 // This is an optimisation for the most likely case:
596 // the next page directly follows the current read position.
597 // Then it would be a waste to read more than the needed amount.
598 &mut buf[0 .. 27 - self.read_amount]
599 } else {
600 match self.mode {
601 Searching => &mut buf,
602 FoundWithNeeded(amount) => &mut buf[0 .. amount as usize],
603 SeekNeeded(_) => return Ok(Res::SeekNeeded),
604 Found => return Ok(Res::Found),
605 }
606 }));
607
608 if rd_len == 0 {
609 // Reached EOF.
610 if self.read_amount == 0 {
611 // If we have read nothing yet, there is no data
612 // but ogg data, meaning the stream ends legally
613 // and without corruption.
614 return Ok(Res::Eof);
615 } else {
616 // There is most likely a corruption here.
617 // I'm not sure, but the ogg spec doesn't say that
618 // random data past the last ogg page is allowed,
619 // so we just assume it's not allowed.
620 tri!(Err(OggReadError::NoCapturePatternFound));
621 }
622 }
623 self.read_amount += rd_len;
624
625 // 150 kb gives us a bit of safety: we can survive
626 // up to one page with a corrupted capture pattern
627 // after having seeked right after a capture pattern
628 // of an earlier page.
629 let read_amount_max = 150 * 1024;
630 if self.read_amount > read_amount_max {
631 // Exhaustive searching for the capture pattern
632 // has returned no ogg capture pattern.
633 tri!(Err(OggReadError::NoCapturePatternFound));
634 }
635
636 let rd_buf = &buf[0 .. rd_len];
637
638 use std::cmp::min;
639 let (off, needed) = match self.mode {
640 Searching => match self.check_arr(rd_buf) {
641 // Capture pattern found
642 Some(off) => {
643 self.ret_buf[0] = b'O';
644 self.ret_buf[1] = b'g';
645 self.ret_buf[2] = b'g';
646 self.ret_buf[3] = b'S'; // (Not actually needed)
647 (off, 24)
648 },
649 // Nothing found
650 None => return Ok(Res::ReadNeeded),
651 },
652 FoundWithNeeded(needed) => {
653 (0, needed as usize)
654 },
655 _ => unimplemented!(),
656 };
657
658 let fnd_buf = &rd_buf[off..];
659
660 let copy_amount = min(needed, fnd_buf.len());
661 let start_fill = 27 - needed;
662 (&mut self.ret_buf[start_fill .. copy_amount + start_fill])
663 .copy_from_slice(&fnd_buf[0 .. copy_amount]);
664 if fnd_buf.len() == needed {
665 // Capture pattern found!
666 self.mode = Found;
667 return Ok(Res::Found);
668 } else if fnd_buf.len() < needed {
669 // We still have to read some content.
670 let needed_new = needed - copy_amount;
671 self.mode = FoundWithNeeded(needed_new as u8);
672 return Ok(Res::ReadNeeded);
673 } else {
674 // We have read too much content (exceeding the header).
675 // Seek back so that we are at the position
676 // right after the header.
677
678 self.mode = SeekNeeded(needed as i32 - fnd_buf.len() as i32);
679 return Ok(Res::SeekNeeded);
680 }
681 }
do_seek<S :Seek>(&mut self, mut skr :S) -> Result<UntilPageHeaderResult, OggReadError>682 pub fn do_seek<S :Seek>(&mut self, mut skr :S)
683 -> Result<UntilPageHeaderResult, OggReadError> {
684 use self::UntilPageHeaderReaderMode::*;
685 use self::UntilPageHeaderResult as Res;
686 match self.mode {
687 Searching | FoundWithNeeded(_) => Ok(Res::ReadNeeded),
688 SeekNeeded(offs) => {
689 tri!(skr.seek(SeekFrom::Current(offs as i64)));
690 self.mode = Found;
691 Ok(Res::Found)
692 },
693 Found => Ok(Res::Found),
694 }
695 }
into_header(self) -> [u8; 27]696 pub fn into_header(self) -> [u8; 27] {
697 use self::UntilPageHeaderReaderMode::*;
698 match self.mode {
699 Found => self.ret_buf,
700 _ => panic!("wrong mode"),
701 }
702 }
703 }
704
705 /**
706 Reader for packets from an Ogg stream.
707
708 This reads codec packets belonging to several different logical streams from one physical Ogg container stream.
709
710 This reader is not async ready. It does not keep its internal state
711 consistent when it encounters the `WouldBlock` error kind.
712 If you desire async functionality, consider enabling the `async` feature
713 and look into the async module.
714 */
715 pub struct PacketReader<T :io::Read + io::Seek> {
716 rdr :T,
717
718 base_pck_rdr :BasePacketReader,
719 }
720
721 impl<T :io::Read + io::Seek> PacketReader<T> {
722 /// Constructs a new `PacketReader` with a given `Read`.
new(rdr :T) -> PacketReader<T>723 pub fn new(rdr :T) -> PacketReader<T> {
724 PacketReader { rdr, base_pck_rdr : BasePacketReader::new() }
725 }
726 /// Returns the wrapped reader, consuming the `PacketReader`.
into_inner(self) -> T727 pub fn into_inner(self) -> T {
728 self.rdr
729 }
730 /// Reads a packet, and returns it on success.
731 ///
732 /// Ok(None) is returned if the physical stream has ended.
read_packet(&mut self) -> Result<Option<Packet>, OggReadError>733 pub fn read_packet(&mut self) -> Result<Option<Packet>, OggReadError> {
734 // Read pages until we got a valid entire packet
735 // (packets may span multiple pages, so reading one page
736 // doesn't always suffice to give us a valid packet)
737 loop {
738 if let Some(pck) = self.base_pck_rdr.read_packet() {
739 return Ok(Some(pck));
740 }
741 let page = tri!(self.read_ogg_page());
742 match page {
743 Some(page) => tri!(self.base_pck_rdr.push_page(page)),
744 None => return Ok(None),
745 }
746 }
747 }
748 /// Reads a packet, and returns it on success.
749 ///
750 /// The difference to the `read_packet` function is that this function
751 /// returns an Err(_) if the physical stream has ended.
752 /// This function is useful if you expect a new packet to come.
read_packet_expected(&mut self) -> Result<Packet, OggReadError>753 pub fn read_packet_expected(&mut self) -> Result<Packet, OggReadError> {
754 match tri!(self.read_packet()) {
755 Some(p) => Ok(p),
756 None => tri!(Err(Error::new(ErrorKind::UnexpectedEof,
757 "Expected ogg packet but found end of physical stream"))),
758 }
759 }
760
761 /// Reads until the new page header, and then returns the page header array.
762 ///
763 /// If no new page header is immediately found, it performs a "recapture",
764 /// meaning it searches for the capture pattern, and if it finds it, it
765 /// reads the complete first 27 bytes of the header, and returns them.
766 ///
767 /// Ok(None) is returned if the stream has ended without an uncompleted page
768 /// or non page data after the last page (if any) present.
read_until_pg_header(&mut self) -> Result<Option<[u8; 27]>, OggReadError>769 fn read_until_pg_header(&mut self) -> Result<Option<[u8; 27]>, OggReadError> {
770 let mut r = UntilPageHeaderReader::new();
771 use self::UntilPageHeaderResult::*;
772 let mut res = tri!(r.do_read(&mut self.rdr));
773 loop {
774 res = match res {
775 Eof => return Ok(None),
776 Found => break,
777 ReadNeeded => tri!(r.do_read(&mut self.rdr)),
778 SeekNeeded => tri!(r.do_seek(&mut self.rdr))
779 }
780 }
781 Ok(Some(r.into_header()))
782 }
783
784 /// Parses and reads a new OGG page
785 ///
786 /// To support seeking this does not assume that the capture pattern
787 /// is at the current reader position.
788 /// Instead it searches until it finds the capture pattern.
read_ogg_page(&mut self) -> Result<Option<OggPage>, OggReadError>789 fn read_ogg_page(&mut self) -> Result<Option<OggPage>, OggReadError> {
790 let header_buf :[u8; 27] = match tri!(self.read_until_pg_header()) {
791 Some(s) => s,
792 None => return Ok(None)
793 };
794 let (mut pg_prs, page_segments) = tri!(PageParser::new(header_buf));
795
796 let mut segments_buf = vec![0; page_segments]; // TODO fix this, we initialize memory for NOTHING!!! Out of some reason, this is seen as "unsafe" by rustc.
797 tri!(self.rdr.read_exact(&mut segments_buf));
798
799 let page_siz = pg_prs.parse_segments(segments_buf);
800
801 let mut packet_data = vec![0; page_siz as usize];
802 tri!(self.rdr.read_exact(&mut packet_data));
803
804 Ok(Some(tri!(pg_prs.parse_packet_data(packet_data))))
805 }
806
807 /// Seeks the underlying reader
808 ///
809 /// Seeks the reader that this PacketReader bases on by the specified
810 /// number of bytes. All new pages will be read from the new position.
811 ///
812 /// This also flushes all the unread packets in the queue.
seek_bytes(&mut self, pos :SeekFrom) -> Result<u64, Error>813 pub fn seek_bytes(&mut self, pos :SeekFrom) -> Result<u64, Error> {
814 let r = tri!(self.rdr.seek(pos));
815 // Reset the internal state
816 self.base_pck_rdr.update_after_seek();
817 return Ok(r);
818 }
819
820 /// Seeks to absolute granule pos
821 ///
822 /// More specifically, it seeks to the first Ogg page
823 /// that has an `absgp` greater or equal to the specified one.
824 /// In the case of continued packets, the seek operation may also end up
825 /// at the last page that comes before such a page and has a packet start.
826 ///
827 /// The passed `stream_serial` parameter controls the stream
828 /// serial number to filter our search for. If it's `None`, no
829 /// filtering is applied, but if it is `Some(n)`, we filter for
830 /// streams with the serial number `n`.
831 /// Note that the `None` case is only intended for streams
832 /// where only one logical stream exists, the seek may misbehave
833 /// if `Ǹone` gets passed when multiple streams exist.
834 ///
835 /// The returned bool indicates whether the seek was successful.
seek_absgp(&mut self, stream_serial :Option<u32>, pos_goal :u64) -> Result<bool, OggReadError>836 pub fn seek_absgp(&mut self, stream_serial :Option<u32>,
837 pos_goal :u64) -> Result<bool, OggReadError> {
838 macro_rules! found {
839 ($pos:expr) => {{
840 // println!("found: {}", $pos);
841 tri!(self.rdr.seek(SeekFrom::Start($pos)));
842 self.base_pck_rdr.update_after_seek();
843 return Ok(true);
844 }};
845 }
846 macro_rules! bt {
847 ($e:expr) => {{
848 match tri!($e) {
849 Some(s) => s,
850 None => return Ok(false),
851 }
852 }};
853 }
854 // The task of this macro is to read to the
855 // end of the logical stream. For optimisation reasons,
856 // it returns early if we found our goal
857 // or any page past it.
858 macro_rules! pg_read_until_end_or_goal {
859 {$goal:expr} => {{
860 let mut pos;
861 let mut pg;
862 loop {
863 let (n_pos, n_pg) = pg_read_match_serial!();
864 pos = n_pos;
865 pg = n_pg;
866 // If the absgp matches our goal, the seek process is done.
867 // This is a nice shortcut as we don't need to perform
868 // the remainder of the seek process any more.
869 // Of course, an exact match only happens in the fewest
870 // of cases
871 if pg.0.bi.absgp == $goal {
872 found!(pos);
873 }
874 // If we found a page past our goal, we already
875 // found a position that can serve as end post of the search.
876 if pg.0.bi.absgp > $goal {
877 break;
878 }
879 // Stop the search if the stream has ended.
880 if pg.0.bi.last_page {
881 return Ok(false)
882 }
883 // If the page is not interesting, seek over it.
884 }
885 (pos, pg)
886 }};
887 }
888 macro_rules! pg_read_match_serial {
889 {} => {{
890 let mut pos;
891 let mut pg;
892 let mut continued_pck_start = None;
893 loop {
894 pos = tri!(self.rdr.seek(SeekFrom::Current(0)));
895 pg = bt!(self.read_ogg_page());
896 /*println!("absgp {} serial {} wh {} pe {} @ {}",
897 pg.0.bi.absgp, pg.0.bi.sequence_num,
898 pg.has_whole_packet(), pg.has_packet_end(), pos);// */
899
900 match stream_serial {
901 // Continue the search if we encounter a
902 // page with a different stream serial
903 Some(s) if pg.0.stream_serial != s => (),
904 _ => match continued_pck_start {
905 None if pg.has_whole_packet() => break,
906 None if pg.has_packet_start() => {
907 continued_pck_start = Some(pos);
908 },
909 Some(s) if pg.has_packet_end() => {
910 // We have remembered a packet start,
911 // and have just encountered a packet end.
912 // Return the position of the start with the
913 // info from the end (for the absgp).
914 pos = s;
915 break;
916 },
917 _ => (),
918 },
919 }
920 }
921 (pos, pg)
922 }};
923 }
924
925 // Bisect seeking algo.
926 // Start by finding boundaries, e.g. at the start and
927 // end of the file, then bisect those boundaries successively
928 // until a page is found.
929
930 //println!("seek start. goal = {}", pos_goal);
931 let ab_of = |pg :&OggPage| { pg.0.bi.absgp };
932 let seq_of = |pg :&OggPage| { pg.0.bi.sequence_num };
933
934 // First, find initial "boundaries"
935 // Seek to the start of the file to get the starting boundary
936 tri!(self.rdr.seek(SeekFrom::Start(0)));
937 let (mut begin_pos, mut begin_pg) = pg_read_match_serial!();
938
939 // If the goal is the beginning, we are done.
940 if pos_goal == 0 {
941 //println!("Seeking to the beginning of the stream - skipping bisect.");
942 found!(begin_pos);
943 }
944
945 // Seek to the end of the file to get the ending boundary
946 // TODO the 200 KB is just a guessed number, any ideas
947 // to improve it?
948 tri!(seek_before_end(&mut self.rdr, 200 * 1024));
949 let (mut end_pos, mut end_pg) = pg_read_until_end_or_goal!(pos_goal);
950
951 // Then perform the bisection
952 loop {
953 // Search is done if the two limits are the same page,
954 // or consecutive pages.
955 if seq_of(&end_pg) - seq_of(&begin_pg) <= 1 {
956 found!(end_pos);
957 }
958 // Perform the bisection step
959 let pos_to_seek = begin_pos + (end_pos - begin_pos) / 2;
960 tri!(self.rdr.seek(SeekFrom::Start(pos_to_seek)));
961 let (pos, pg) = pg_read_match_serial!();
962 /*println!("seek {} {} . {} @ {} {} . {}",
963 ab_of(&begin_pg), ab_of(&end_pg), ab_of(&pg),
964 begin_pos, end_pos, pos);// */
965
966 if seq_of(&end_pg) == seq_of(&pg) ||
967 seq_of(&begin_pg) == seq_of(&pg) {
968 //println!("switching to linear.");
969 // The bisection seek doesn't bring us any further.
970 // Switch to a linear seek to get the last details.
971 let mut pos;
972 let mut pg;
973 let mut last_packet_end_pos = begin_pos;
974 tri!(self.rdr.seek(SeekFrom::Start(begin_pos)));
975 loop {
976 pos = tri!(self.rdr.seek(SeekFrom::Current(0)));
977 pg = bt!(self.read_ogg_page());
978 /*println!("absgp {} pck_start {} whole_pck {} pck_end {} @ {} {}",
979 ab_of(&pg), pg.has_packet_start(), pg.has_whole_packet(),
980 pg.has_packet_end(),
981 pos, last_packet_end_pos);// */
982 match stream_serial {
983 // Continue the search if we encounter a
984 // page with a different stream serial,
985 // or one with an absgp of -1.
986 Some(s) if pg.0.stream_serial != s => (),
987 _ if ab_of(&pg) == -1i64 as u64 => (),
988 // The page is found if the absgp is >= our goal
989 _ if ab_of(&pg) >= pos_goal => found!(last_packet_end_pos),
990 // If we encounter a page with a packet start,
991 // update accordingly.
992 _ => if pg.has_packet_end() {
993 last_packet_end_pos = pos;
994 },
995 }
996 }
997 }
998 if ab_of(&pg) >= pos_goal {
999 end_pos = pos;
1000 end_pg = pg;
1001 } else {
1002 begin_pos = pos;
1003 begin_pg = pg;
1004 }
1005 }
1006 }
1007 /// Resets the internal state by deleting all
1008 /// unread packets.
delete_unread_packets(&mut self)1009 pub fn delete_unread_packets(&mut self) {
1010 self.base_pck_rdr.update_after_seek();
1011 }
1012 }
1013
1014 // util function
seek_before_end<T :io::Read + io::Seek>(mut rdr :T, offs :u64) -> Result<u64, OggReadError>1015 fn seek_before_end<T :io::Read + io::Seek>(mut rdr :T,
1016 offs :u64) -> Result<u64, OggReadError> {
1017 let end_pos = tri!(rdr.seek(SeekFrom::End(0)));
1018 let end_pos_to_seek = ::std::cmp::min(end_pos, offs);
1019 return Ok(tri!(rdr.seek(SeekFrom::End(-(end_pos_to_seek as i64)))));
1020 }
1021
1022 #[cfg(feature = "async")]
1023 /**
1024 Asyncronous ogg decoding
1025 */
1026 pub mod async_api {
1027 #![allow(deprecated)]
1028
1029 use super::*;
1030 use tokio_io::AsyncRead;
1031 use tokio_io::codec::{Decoder, FramedRead};
1032 use futures::stream::Stream;
1033 use futures::{Async, Poll};
1034 use bytes::BytesMut;
1035
1036 enum PageDecodeState {
1037 Head,
1038 Segments(PageParser, usize),
1039 PacketData(PageParser, usize),
1040 InUpdate,
1041 }
1042
1043 impl PageDecodeState {
needed_size(&self) -> usize1044 fn needed_size(&self) -> usize {
1045 match self {
1046 &PageDecodeState::Head => 27,
1047 &PageDecodeState::Segments(_, s) => s,
1048 &PageDecodeState::PacketData(_, s) => s,
1049 &PageDecodeState::InUpdate => panic!("invalid state"),
1050 }
1051 }
1052 }
1053
1054 /**
1055 Async page reading functionality.
1056 */
1057 struct PageDecoder {
1058 state : PageDecodeState,
1059 }
1060
1061 impl PageDecoder {
new() -> Self1062 fn new() -> Self {
1063 PageDecoder {
1064 state : PageDecodeState::Head,
1065 }
1066 }
1067 }
1068
1069 impl Decoder for PageDecoder {
1070 type Item = OggPage;
1071 type Error = OggReadError;
1072
decode(&mut self, buf :&mut BytesMut) -> Result<Option<OggPage>, OggReadError>1073 fn decode(&mut self, buf :&mut BytesMut) ->
1074 Result<Option<OggPage>, OggReadError> {
1075 use self::PageDecodeState::*;
1076 loop {
1077 let needed_size = self.state.needed_size();
1078 if buf.len() < needed_size {
1079 return Ok(None);
1080 }
1081 let mut ret = None;
1082 let consumed_buf = buf.split_to(needed_size).to_vec();
1083
1084 self.state = match ::std::mem::replace(&mut self.state, InUpdate) {
1085 Head => {
1086 let mut hdr_buf = [0; 27];
1087 // TODO once we have const generics, the copy below can be done
1088 // much nicer, maybe with a new into_array fn on Vec's
1089 hdr_buf.copy_from_slice(&consumed_buf);
1090 let tup = tri!(PageParser::new(hdr_buf));
1091 Segments(tup.0, tup.1)
1092 },
1093 Segments(mut pg_prs, _) => {
1094 let new_needed_len = pg_prs.parse_segments(consumed_buf);
1095 PacketData(pg_prs, new_needed_len)
1096 },
1097 PacketData(pg_prs, _) => {
1098 ret = Some(tri!(pg_prs.parse_packet_data(consumed_buf)));
1099 Head
1100 },
1101 InUpdate => panic!("invalid state"),
1102 };
1103 if ret.is_some() {
1104 return Ok(ret);
1105 }
1106 }
1107 }
1108
decode_eof(&mut self, buf :&mut BytesMut) -> Result<Option<OggPage>, OggReadError>1109 fn decode_eof(&mut self, buf :&mut BytesMut) ->
1110 Result<Option<OggPage>, OggReadError> {
1111 // Ugly hack for "bytes remaining on stream" error
1112 return self.decode(buf);
1113 }
1114 }
1115
1116 /**
1117 Async packet reading functionality.
1118 */
1119 pub struct PacketReader<T> where T :AsyncRead {
1120 base_pck_rdr :BasePacketReader,
1121 pg_rd :FramedRead<T, PageDecoder>,
1122 }
1123
1124 impl<T :AsyncRead> PacketReader<T> {
new(inner :T) -> Self1125 pub fn new(inner :T) -> Self {
1126 PacketReader {
1127 base_pck_rdr : BasePacketReader::new(),
1128 pg_rd : FramedRead::new(inner, PageDecoder::new()),
1129 }
1130 }
1131 }
1132
1133 impl<T :AsyncRead> Stream for PacketReader<T> {
1134 type Item = Packet;
1135 type Error = OggReadError;
1136
poll(&mut self) -> Poll<Option<Packet>, OggReadError>1137 fn poll(&mut self) -> Poll<Option<Packet>, OggReadError> {
1138 // Read pages until we got a valid entire packet
1139 // (packets may span multiple pages, so reading one page
1140 // doesn't always suffice to give us a valid packet)
1141 loop {
1142 if let Some(pck) = self.base_pck_rdr.read_packet() {
1143 return Ok(Async::Ready(Some(pck)));
1144 }
1145 let page = try_ready!(self.pg_rd.poll());
1146 match page {
1147 Some(page) => tri!(self.base_pck_rdr.push_page(page)),
1148 None => return Ok(Async::Ready(None)),
1149 }
1150 }
1151 }
1152 }
1153
1154 }
1155