1 use std::cell::Cell;
2 use std::collections::VecDeque;
3 use std::io;
4 use std::os::unix::io::AsRawFd;
5 use std::rc::Rc;
6 use std::sync::{Arc, Mutex};
7
8 use nix::poll::{poll, PollFd, PollFlags};
9
10 use wayland_commons::map::ObjectMap;
11 use wayland_commons::wire::{Argument, Message};
12
13 use super::connection::{Connection, Error as CError};
14 use super::proxy::{ObjectMeta, ProxyInner};
15 use super::Dispatched;
16
17 use crate::{AnonymousObject, DispatchData, Filter, Main, RawEvent};
18
19 pub(crate) type QueueBuffer = Arc<Mutex<VecDeque<Message>>>;
20
create_queue_buffer() -> QueueBuffer21 pub(crate) fn create_queue_buffer() -> QueueBuffer {
22 Arc::new(Mutex::new(VecDeque::new()))
23 }
24
25 pub(crate) struct EventQueueInner {
26 pub(crate) connection: Arc<Mutex<Connection>>,
27 pub(crate) map: Arc<Mutex<ObjectMap<ObjectMeta>>>,
28 pub(crate) buffer: QueueBuffer,
29 display_buffer: QueueBuffer,
30 }
31
32 impl EventQueueInner {
new( connection: Arc<Mutex<Connection>>, buffer: Option<QueueBuffer>, ) -> EventQueueInner33 pub(crate) fn new(
34 connection: Arc<Mutex<Connection>>,
35 buffer: Option<QueueBuffer>,
36 ) -> EventQueueInner {
37 let (map, display_buffer) = {
38 let cx = connection.lock().unwrap();
39 (cx.map.clone(), cx.display_buffer.clone())
40 };
41 EventQueueInner {
42 connection,
43 map,
44 buffer: buffer.unwrap_or_else(create_queue_buffer),
45 display_buffer,
46 }
47 }
48
dispatch<F>(&self, mut data: DispatchData, mut fallback: F) -> io::Result<u32> where F: FnMut(RawEvent, Main<AnonymousObject>, DispatchData<'_>),49 pub(crate) fn dispatch<F>(&self, mut data: DispatchData, mut fallback: F) -> io::Result<u32>
50 where
51 F: FnMut(RawEvent, Main<AnonymousObject>, DispatchData<'_>),
52 {
53 // don't read events if there are some pending
54 if let Err(()) = self.prepare_read() {
55 return self.dispatch_pending(data.reborrow(), &mut fallback);
56 }
57
58 // temporarily retrieve the socket Fd, only using it for POLL-ing!
59 let socket_fd;
60 {
61 // Flush the outgoing socket
62 let mut conn_lock = self.connection.lock().unwrap();
63 socket_fd = conn_lock.socket.get_socket().as_raw_fd();
64 loop {
65 match conn_lock.flush() {
66 Ok(_) => break,
67 Err(::nix::Error::Sys(::nix::errno::Errno::EAGAIN)) => {
68 // EAGAIN, we need to wait before writing, so we poll the socket
69 let poll_ret = poll(&mut [PollFd::new(socket_fd, PollFlags::POLLOUT)], -1);
70 match poll_ret {
71 Ok(_) => continue,
72 Err(::nix::Error::Sys(e)) => {
73 self.cancel_read();
74 return Err(e.into());
75 }
76 Err(_) => unreachable!(),
77 }
78 }
79 Err(::nix::Error::Sys(e)) => {
80 if e != ::nix::errno::Errno::EPIPE {
81 // don't abort on EPIPE, so we can continue reading
82 // to get the protocol error
83 self.cancel_read();
84 return Err(e.into());
85 }
86 }
87 Err(_) => unreachable!(),
88 }
89 }
90 }
91
92 // wait for incoming messages to arrive
93 match poll(&mut [PollFd::new(socket_fd, PollFlags::POLLIN)], -1) {
94 Ok(_) => (),
95 Err(::nix::Error::Sys(e)) => {
96 self.cancel_read();
97 return Err(e.into());
98 }
99 Err(_) => unreachable!(),
100 }
101 let read_ret = self.read_events();
102
103 // even if read_events returned an error, it may have queued messages the need dispatching
104 // so we dispatch them
105 let dispatch_ret = self.dispatch_pending(data.reborrow(), &mut fallback);
106
107 match read_ret {
108 Ok(()) => (),
109 Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
110 // we waited for read readiness be then received a WouldBlock error
111 // this means that an other thread was also reading events and read them
112 // under our nose
113 // this is alright, continue
114 }
115 Err(e) => return Err(e),
116 }
117
118 dispatch_ret
119 }
120
dispatch_buffer<F>( &self, buffer: &Mutex<VecDeque<Message>>, mut data: DispatchData, mut fallback: F, ) -> io::Result<u32> where F: FnMut(RawEvent, Main<AnonymousObject>, DispatchData<'_>),121 fn dispatch_buffer<F>(
122 &self,
123 buffer: &Mutex<VecDeque<Message>>,
124 mut data: DispatchData,
125 mut fallback: F,
126 ) -> io::Result<u32>
127 where
128 F: FnMut(RawEvent, Main<AnonymousObject>, DispatchData<'_>),
129 {
130 let mut count = 0;
131 let mut proxymap = super::ProxyMap::make(self.map.clone(), self.connection.clone());
132 loop {
133 let msg = { buffer.lock().unwrap().pop_front() };
134 let msg = match msg {
135 Some(m) => m,
136 None => break,
137 };
138 let id = msg.sender_id;
139 if let Some(proxy) = ProxyInner::from_id(id, self.map.clone(), self.connection.clone())
140 {
141 let object = proxy.object.clone();
142 if object.meta.client_destroyed {
143 // This is a potential race, if we reach here it means that the proxy was
144 // destroyed by the user between this message was queued and now. To handle it
145 // correctly, we must close any FDs it contains, mark any child object as
146 // destroyed (but the server will never know about it, so the ids will be
147 // leaked) and discard the event.
148 for arg in msg.args {
149 match arg {
150 Argument::Fd(fd) => {
151 let _ = ::nix::unistd::close(fd);
152 }
153 Argument::NewId(id) => {
154 let mut map = self.map.lock().unwrap();
155 map.with(id, |obj| {
156 obj.meta.client_destroyed = true;
157 })
158 .unwrap();
159 }
160 _ => {}
161 }
162 }
163 continue;
164 }
165 let mut dispatcher = object.meta.dispatcher.lock().unwrap();
166 match dispatcher.dispatch(msg, proxy, &mut proxymap, data.reborrow()) {
167 Dispatched::Yes => {
168 count += 1;
169 }
170 Dispatched::NoDispatch(msg, proxy) => {
171 let raw_event = message_to_rawevent(msg, &proxy, &mut proxymap);
172 fallback(raw_event, Main::wrap(proxy), data.reborrow());
173 count += 1;
174 }
175 Dispatched::BadMsg => {
176 return Err(io::Error::new(
177 io::ErrorKind::Other,
178 format!("Dispatch for object {}@{} errored.", object.interface, id),
179 ))
180 }
181 }
182 } else {
183 return Err(io::Error::new(
184 io::ErrorKind::Other,
185 format!("Received an event for unknown object {}.", id),
186 ));
187 }
188 }
189 Ok(count)
190 }
191
dispatch_pending<F>(&self, mut data: DispatchData, fallback: F) -> io::Result<u32> where F: FnMut(RawEvent, Main<AnonymousObject>, DispatchData<'_>),192 pub(crate) fn dispatch_pending<F>(&self, mut data: DispatchData, fallback: F) -> io::Result<u32>
193 where
194 F: FnMut(RawEvent, Main<AnonymousObject>, DispatchData<'_>),
195 {
196 // First always dispatch the display buffer
197 let display_dispatched =
198 self.dispatch_buffer(&self.display_buffer, data.reborrow(), |_, _, _| unreachable!())?;
199
200 // Then our actual buffer
201 let self_dispatched = self.dispatch_buffer(&self.buffer, data.reborrow(), fallback)?;
202
203 Ok(display_dispatched + self_dispatched)
204 }
205
sync_roundtrip<F>( &self, mut data: DispatchData, mut fallback: F, ) -> io::Result<u32> where F: FnMut(RawEvent, Main<AnonymousObject>, DispatchData<'_>),206 pub(crate) fn sync_roundtrip<F>(
207 &self,
208 mut data: DispatchData,
209 mut fallback: F,
210 ) -> io::Result<u32>
211 where
212 F: FnMut(RawEvent, Main<AnonymousObject>, DispatchData<'_>),
213 {
214 use crate::protocol::wl_callback::{Event as CbEvent, WlCallback};
215 use crate::protocol::wl_display::{Request as DRequest, WlDisplay};
216 // first retrieve the display and make a wrapper for it in this event queue
217 let mut display =
218 ProxyInner::from_id(1, self.map.clone(), self.connection.clone()).unwrap();
219 display.attach(&self);
220
221 let done = Rc::new(Cell::new(false));
222 let cb = display.send::<WlDisplay, WlCallback>(DRequest::Sync {}, Some(1)).unwrap();
223 let done2 = done.clone();
224 cb.assign::<WlCallback, _>(Filter::new(move |(_, CbEvent::Done { .. }), _, _| {
225 done2.set(true);
226 }));
227
228 let mut dispatched = 0;
229
230 loop {
231 dispatched += self.dispatch(data.reborrow(), &mut fallback)?;
232 if done.get() {
233 return Ok(dispatched);
234 }
235 }
236 }
237
prepare_read(&self) -> Result<(), ()>238 pub(crate) fn prepare_read(&self) -> Result<(), ()> {
239 if !self.buffer.lock().unwrap().is_empty() {
240 return Err(());
241 }
242
243 // TODO: un-mock
244 Ok(())
245 }
246
read_events(&self) -> io::Result<()>247 pub(crate) fn read_events(&self) -> io::Result<()> {
248 // TODO: integrate more properly with prepare read with a fence
249 match self.connection.lock().unwrap().read_events() {
250 Ok(_) => Ok(()),
251 Err(CError::Protocol(_)) => Err(::nix::errno::Errno::EPROTO.into()),
252 Err(CError::Parse(_)) => Err(::nix::errno::Errno::EPROTO.into()),
253 Err(CError::Nix(::nix::Error::Sys(errno))) => Err(errno.into()),
254 Err(CError::Nix(_)) => unreachable!(),
255 }
256 }
257
cancel_read(&self)258 pub(crate) fn cancel_read(&self) {
259 // TODO: un-mock
260 }
261 }
262
message_to_rawevent(msg: Message, proxy: &ProxyInner, map: &mut super::ProxyMap) -> RawEvent263 fn message_to_rawevent(msg: Message, proxy: &ProxyInner, map: &mut super::ProxyMap) -> RawEvent {
264 let Message { opcode, args, .. } = msg;
265
266 let args = args
267 .into_iter()
268 .map(|a| match a {
269 Argument::Int(i) => crate::Argument::Int(i),
270 Argument::Uint(u) => crate::Argument::Uint(u),
271 Argument::Array(v) => {
272 crate::Argument::Array(if v.is_empty() { None } else { Some(*v) })
273 }
274 Argument::Fixed(f) => crate::Argument::Float((f as f32) / 256.),
275 Argument::Fd(f) => crate::Argument::Fd(f),
276 Argument::Str(cs) => crate::Argument::Str({
277 let bytes = cs.into_bytes();
278 if bytes.is_empty() {
279 None
280 } else {
281 Some(
282 String::from_utf8(bytes)
283 .unwrap_or_else(|e| String::from_utf8_lossy(&e.into_bytes()).into()),
284 )
285 }
286 }),
287 Argument::Object(id) => crate::Argument::Object(map.get(id)),
288 Argument::NewId(id) => crate::Argument::NewId(map.get_new(id)),
289 })
290 .collect();
291
292 RawEvent {
293 interface: proxy.object.interface,
294 opcode,
295 name: proxy.object.events[opcode as usize].name,
296 args,
297 }
298 }
299