use std::prelude::v1::*; use std::any::Any; use std::panic::{catch_unwind, UnwindSafe, AssertUnwindSafe}; use std::mem; use super::super::{Poll, Async}; use super::Stream; /// Stream for the `catch_unwind` combinator. /// /// This is created by the `Stream::catch_unwind` method. #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct CatchUnwind where S: Stream { state: CatchUnwindState, } pub fn new(stream: S) -> CatchUnwind where S: Stream + UnwindSafe, { CatchUnwind { state: CatchUnwindState::Stream(stream), } } #[derive(Debug)] enum CatchUnwindState { Stream(S), Eof, Done, } impl Stream for CatchUnwind where S: Stream + UnwindSafe, { type Item = Result; type Error = Box; fn poll(&mut self) -> Poll, Self::Error> { let mut stream = match mem::replace(&mut self.state, CatchUnwindState::Eof) { CatchUnwindState::Done => panic!("cannot poll after eof"), CatchUnwindState::Eof => { self.state = CatchUnwindState::Done; return Ok(Async::Ready(None)); } CatchUnwindState::Stream(stream) => stream, }; let res = catch_unwind(|| (stream.poll(), stream)); match res { Err(e) => Err(e), // and state is already Eof Ok((poll, stream)) => { self.state = CatchUnwindState::Stream(stream); match poll { Err(e) => Ok(Async::Ready(Some(Err(e)))), Ok(Async::NotReady) => Ok(Async::NotReady), Ok(Async::Ready(Some(r))) => Ok(Async::Ready(Some(Ok(r)))), Ok(Async::Ready(None)) => Ok(Async::Ready(None)), } } } } } impl Stream for AssertUnwindSafe { type Item = S::Item; type Error = S::Error; fn poll(&mut self) -> Poll, S::Error> { self.0.poll() } }