1 use std::cell::Cell;
2 use std::collections::VecDeque;
3 use std::io;
4 use std::os::unix::io::AsRawFd;
5 use std::rc::Rc;
6 use std::sync::{Arc, Mutex};
7 
8 use nix::poll::{poll, PollFd, PollFlags};
9 
10 use wayland_commons::map::ObjectMap;
11 use wayland_commons::wire::{Argument, Message};
12 
13 use super::connection::{Connection, Error as CError};
14 use super::proxy::{ObjectMeta, ProxyInner};
15 use super::Dispatched;
16 
17 use crate::{AnonymousObject, DispatchData, Filter, Main, RawEvent};
18 
19 pub(crate) type QueueBuffer = Arc<Mutex<VecDeque<Message>>>;
20 
create_queue_buffer() -> QueueBuffer21 pub(crate) fn create_queue_buffer() -> QueueBuffer {
22     Arc::new(Mutex::new(VecDeque::new()))
23 }
24 
25 pub(crate) struct EventQueueInner {
26     pub(crate) connection: Arc<Mutex<Connection>>,
27     pub(crate) map: Arc<Mutex<ObjectMap<ObjectMeta>>>,
28     pub(crate) buffer: QueueBuffer,
29     display_buffer: QueueBuffer,
30 }
31 
32 impl EventQueueInner {
new( connection: Arc<Mutex<Connection>>, buffer: Option<QueueBuffer>, ) -> EventQueueInner33     pub(crate) fn new(
34         connection: Arc<Mutex<Connection>>,
35         buffer: Option<QueueBuffer>,
36     ) -> EventQueueInner {
37         let (map, display_buffer) = {
38             let cx = connection.lock().unwrap();
39             (cx.map.clone(), cx.display_buffer.clone())
40         };
41         EventQueueInner {
42             connection,
43             map,
44             buffer: buffer.unwrap_or_else(create_queue_buffer),
45             display_buffer,
46         }
47     }
48 
dispatch<F>(&self, mut data: DispatchData, mut fallback: F) -> io::Result<u32> where F: FnMut(RawEvent, Main<AnonymousObject>, DispatchData<'_>),49     pub(crate) fn dispatch<F>(&self, mut data: DispatchData, mut fallback: F) -> io::Result<u32>
50     where
51         F: FnMut(RawEvent, Main<AnonymousObject>, DispatchData<'_>),
52     {
53         // don't read events if there are some pending
54         if let Err(()) = self.prepare_read() {
55             return self.dispatch_pending(data.reborrow(), &mut fallback);
56         }
57 
58         // temporarily retrieve the socket Fd, only using it for POLL-ing!
59         let socket_fd;
60         {
61             // Flush the outgoing socket
62             let mut conn_lock = self.connection.lock().unwrap();
63             socket_fd = conn_lock.socket.get_socket().as_raw_fd();
64             loop {
65                 match conn_lock.flush() {
66                     Ok(_) => break,
67                     Err(::nix::Error::Sys(::nix::errno::Errno::EAGAIN)) => {
68                         // EAGAIN, we need to wait before writing, so we poll the socket
69                         let poll_ret = poll(&mut [PollFd::new(socket_fd, PollFlags::POLLOUT)], -1);
70                         match poll_ret {
71                             Ok(_) => continue,
72                             Err(::nix::Error::Sys(e)) => {
73                                 self.cancel_read();
74                                 return Err(e.into());
75                             }
76                             Err(_) => unreachable!(),
77                         }
78                     }
79                     Err(::nix::Error::Sys(e)) => {
80                         if e != ::nix::errno::Errno::EPIPE {
81                             // don't abort on EPIPE, so we can continue reading
82                             // to get the protocol error
83                             self.cancel_read();
84                             return Err(e.into());
85                         }
86                     }
87                     Err(_) => unreachable!(),
88                 }
89             }
90         }
91 
92         // wait for incoming messages to arrive
93         match poll(&mut [PollFd::new(socket_fd, PollFlags::POLLIN)], -1) {
94             Ok(_) => (),
95             Err(::nix::Error::Sys(e)) => {
96                 self.cancel_read();
97                 return Err(e.into());
98             }
99             Err(_) => unreachable!(),
100         }
101         let read_ret = self.read_events();
102 
103         // even if read_events returned an error, it may have queued messages the need dispatching
104         // so we dispatch them
105         let dispatch_ret = self.dispatch_pending(data.reborrow(), &mut fallback);
106 
107         match read_ret {
108             Ok(()) => (),
109             Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
110                 // we waited for read readiness be then received a WouldBlock error
111                 // this means that an other thread was also reading events and read them
112                 // under our nose
113                 // this is alright, continue
114             }
115             Err(e) => return Err(e),
116         }
117 
118         dispatch_ret
119     }
120 
dispatch_buffer<F>( &self, buffer: &Mutex<VecDeque<Message>>, mut data: DispatchData, mut fallback: F, ) -> io::Result<u32> where F: FnMut(RawEvent, Main<AnonymousObject>, DispatchData<'_>),121     fn dispatch_buffer<F>(
122         &self,
123         buffer: &Mutex<VecDeque<Message>>,
124         mut data: DispatchData,
125         mut fallback: F,
126     ) -> io::Result<u32>
127     where
128         F: FnMut(RawEvent, Main<AnonymousObject>, DispatchData<'_>),
129     {
130         let mut count = 0;
131         let mut proxymap = super::ProxyMap::make(self.map.clone(), self.connection.clone());
132         loop {
133             let msg = { buffer.lock().unwrap().pop_front() };
134             let msg = match msg {
135                 Some(m) => m,
136                 None => break,
137             };
138             let id = msg.sender_id;
139             if let Some(proxy) = ProxyInner::from_id(id, self.map.clone(), self.connection.clone())
140             {
141                 let object = proxy.object.clone();
142                 if object.meta.client_destroyed {
143                     // This is a potential race, if we reach here it means that the proxy was
144                     // destroyed by the user between this message was queued and now. To handle it
145                     // correctly, we must close any FDs it contains, mark any child object as
146                     // destroyed (but the server will never know about it, so the ids will be
147                     // leaked) and discard the event.
148                     for arg in msg.args {
149                         match arg {
150                             Argument::Fd(fd) => {
151                                 let _ = ::nix::unistd::close(fd);
152                             }
153                             Argument::NewId(id) => {
154                                 let mut map = self.map.lock().unwrap();
155                                 map.with(id, |obj| {
156                                     obj.meta.client_destroyed = true;
157                                 })
158                                 .unwrap();
159                             }
160                             _ => {}
161                         }
162                     }
163                     continue;
164                 }
165                 let mut dispatcher = object.meta.dispatcher.lock().unwrap();
166                 match dispatcher.dispatch(msg, proxy, &mut proxymap, data.reborrow()) {
167                     Dispatched::Yes => {
168                         count += 1;
169                     }
170                     Dispatched::NoDispatch(msg, proxy) => {
171                         let raw_event = message_to_rawevent(msg, &proxy, &mut proxymap);
172                         fallback(raw_event, Main::wrap(proxy), data.reborrow());
173                         count += 1;
174                     }
175                     Dispatched::BadMsg => {
176                         return Err(io::Error::new(
177                             io::ErrorKind::Other,
178                             format!("Dispatch for object {}@{} errored.", object.interface, id),
179                         ))
180                     }
181                 }
182             } else {
183                 return Err(io::Error::new(
184                     io::ErrorKind::Other,
185                     format!("Received an event for unknown object {}.", id),
186                 ));
187             }
188         }
189         Ok(count)
190     }
191 
dispatch_pending<F>(&self, mut data: DispatchData, fallback: F) -> io::Result<u32> where F: FnMut(RawEvent, Main<AnonymousObject>, DispatchData<'_>),192     pub(crate) fn dispatch_pending<F>(&self, mut data: DispatchData, fallback: F) -> io::Result<u32>
193     where
194         F: FnMut(RawEvent, Main<AnonymousObject>, DispatchData<'_>),
195     {
196         // First always dispatch the display buffer
197         let display_dispatched =
198             self.dispatch_buffer(&self.display_buffer, data.reborrow(), |_, _, _| unreachable!())?;
199 
200         // Then our actual buffer
201         let self_dispatched = self.dispatch_buffer(&self.buffer, data.reborrow(), fallback)?;
202 
203         Ok(display_dispatched + self_dispatched)
204     }
205 
sync_roundtrip<F>( &self, mut data: DispatchData, mut fallback: F, ) -> io::Result<u32> where F: FnMut(RawEvent, Main<AnonymousObject>, DispatchData<'_>),206     pub(crate) fn sync_roundtrip<F>(
207         &self,
208         mut data: DispatchData,
209         mut fallback: F,
210     ) -> io::Result<u32>
211     where
212         F: FnMut(RawEvent, Main<AnonymousObject>, DispatchData<'_>),
213     {
214         use crate::protocol::wl_callback::{Event as CbEvent, WlCallback};
215         use crate::protocol::wl_display::{Request as DRequest, WlDisplay};
216         // first retrieve the display and make a wrapper for it in this event queue
217         let mut display =
218             ProxyInner::from_id(1, self.map.clone(), self.connection.clone()).unwrap();
219         display.attach(&self);
220 
221         let done = Rc::new(Cell::new(false));
222         let cb = display.send::<WlDisplay, WlCallback>(DRequest::Sync {}, Some(1)).unwrap();
223         let done2 = done.clone();
224         cb.assign::<WlCallback, _>(Filter::new(move |(_, CbEvent::Done { .. }), _, _| {
225             done2.set(true);
226         }));
227 
228         let mut dispatched = 0;
229 
230         loop {
231             dispatched += self.dispatch(data.reborrow(), &mut fallback)?;
232             if done.get() {
233                 return Ok(dispatched);
234             }
235         }
236     }
237 
prepare_read(&self) -> Result<(), ()>238     pub(crate) fn prepare_read(&self) -> Result<(), ()> {
239         if !self.buffer.lock().unwrap().is_empty() {
240             return Err(());
241         }
242 
243         // TODO: un-mock
244         Ok(())
245     }
246 
read_events(&self) -> io::Result<()>247     pub(crate) fn read_events(&self) -> io::Result<()> {
248         // TODO: integrate more properly with prepare read with a fence
249         match self.connection.lock().unwrap().read_events() {
250             Ok(_) => Ok(()),
251             Err(CError::Protocol(_)) => Err(::nix::errno::Errno::EPROTO.into()),
252             Err(CError::Parse(_)) => Err(::nix::errno::Errno::EPROTO.into()),
253             Err(CError::Nix(::nix::Error::Sys(errno))) => Err(errno.into()),
254             Err(CError::Nix(_)) => unreachable!(),
255         }
256     }
257 
cancel_read(&self)258     pub(crate) fn cancel_read(&self) {
259         // TODO: un-mock
260     }
261 }
262 
message_to_rawevent(msg: Message, proxy: &ProxyInner, map: &mut super::ProxyMap) -> RawEvent263 fn message_to_rawevent(msg: Message, proxy: &ProxyInner, map: &mut super::ProxyMap) -> RawEvent {
264     let Message { opcode, args, .. } = msg;
265 
266     let args = args
267         .into_iter()
268         .map(|a| match a {
269             Argument::Int(i) => crate::Argument::Int(i),
270             Argument::Uint(u) => crate::Argument::Uint(u),
271             Argument::Array(v) => {
272                 crate::Argument::Array(if v.is_empty() { None } else { Some(*v) })
273             }
274             Argument::Fixed(f) => crate::Argument::Float((f as f32) / 256.),
275             Argument::Fd(f) => crate::Argument::Fd(f),
276             Argument::Str(cs) => crate::Argument::Str({
277                 let bytes = cs.into_bytes();
278                 if bytes.is_empty() {
279                     None
280                 } else {
281                     Some(
282                         String::from_utf8(bytes)
283                             .unwrap_or_else(|e| String::from_utf8_lossy(&e.into_bytes()).into()),
284                     )
285                 }
286             }),
287             Argument::Object(id) => crate::Argument::Object(map.get(id)),
288             Argument::NewId(id) => crate::Argument::NewId(map.get_new(id)),
289         })
290         .collect();
291 
292     RawEvent {
293         interface: proxy.object.interface,
294         opcode,
295         name: proxy.object.events[opcode as usize].name,
296         args,
297     }
298 }
299