1 // Copyright (C) 2017 Sebastian Dröge <sebastian@centricular.com>
2 //
3 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6 // option. This file may not be copied, modified, or distributed
7 // except according to those terms.
8 
9 use futures_channel::mpsc::{self, UnboundedReceiver};
10 use futures_core::Stream;
11 use futures_util::{future, StreamExt};
12 use glib;
13 use glib::prelude::*;
14 use glib::source::{Continue, Priority, SourceId};
15 use glib::translate::*;
16 use glib_sys;
17 use glib_sys::{gboolean, gpointer};
18 use gst_sys;
19 use std::cell::RefCell;
20 use std::mem::transmute;
21 use std::pin::Pin;
22 use std::task::{Context, Poll};
23 
24 use Bus;
25 use BusSyncReply;
26 use Message;
27 use MessageType;
28 
29 lazy_static! {
30     static ref SET_ONCE_QUARK: glib::Quark = glib::Quark::from_string("gstreamer-rs-sync-handler");
31 }
32 
trampoline_watch<F: FnMut(&Bus, &Message) -> Continue + 'static>( bus: *mut gst_sys::GstBus, msg: *mut gst_sys::GstMessage, func: gpointer, ) -> gboolean33 unsafe extern "C" fn trampoline_watch<F: FnMut(&Bus, &Message) -> Continue + 'static>(
34     bus: *mut gst_sys::GstBus,
35     msg: *mut gst_sys::GstMessage,
36     func: gpointer,
37 ) -> gboolean {
38     let func: &RefCell<F> = &*(func as *const RefCell<F>);
39     (&mut *func.borrow_mut())(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).to_glib()
40 }
41 
destroy_closure_watch<F: FnMut(&Bus, &Message) -> Continue + 'static>( ptr: gpointer, )42 unsafe extern "C" fn destroy_closure_watch<F: FnMut(&Bus, &Message) -> Continue + 'static>(
43     ptr: gpointer,
44 ) {
45     Box::<RefCell<F>>::from_raw(ptr as *mut _);
46 }
47 
into_raw_watch<F: FnMut(&Bus, &Message) -> Continue + 'static>(func: F) -> gpointer48 fn into_raw_watch<F: FnMut(&Bus, &Message) -> Continue + 'static>(func: F) -> gpointer {
49     #[allow(clippy::type_complexity)]
50     let func: Box<RefCell<F>> = Box::new(RefCell::new(func));
51     Box::into_raw(func) as gpointer
52 }
53 
trampoline_sync< F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static, >( bus: *mut gst_sys::GstBus, msg: *mut gst_sys::GstMessage, func: gpointer, ) -> gst_sys::GstBusSyncReply54 unsafe extern "C" fn trampoline_sync<
55     F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
56 >(
57     bus: *mut gst_sys::GstBus,
58     msg: *mut gst_sys::GstMessage,
59     func: gpointer,
60 ) -> gst_sys::GstBusSyncReply {
61     let f: &F = &*(func as *const F);
62     let res = f(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).to_glib();
63 
64     if res == gst_sys::GST_BUS_DROP {
65         gst_sys::gst_mini_object_unref(msg as *mut _);
66     }
67 
68     res
69 }
70 
destroy_closure_sync< F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static, >( ptr: gpointer, )71 unsafe extern "C" fn destroy_closure_sync<
72     F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
73 >(
74     ptr: gpointer,
75 ) {
76     Box::<F>::from_raw(ptr as *mut _);
77 }
78 
into_raw_sync<F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static>( func: F, ) -> gpointer79 fn into_raw_sync<F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static>(
80     func: F,
81 ) -> gpointer {
82     let func: Box<F> = Box::new(func);
83     Box::into_raw(func) as gpointer
84 }
85 
86 impl Bus {
add_signal_watch_full(&self, priority: Priority)87     pub fn add_signal_watch_full(&self, priority: Priority) {
88         unsafe {
89             gst_sys::gst_bus_add_signal_watch_full(self.to_glib_none().0, priority.to_glib());
90         }
91     }
92 
create_watch<F>(&self, name: Option<&str>, priority: Priority, func: F) -> glib::Source where F: FnMut(&Bus, &Message) -> Continue + Send + 'static,93     pub fn create_watch<F>(&self, name: Option<&str>, priority: Priority, func: F) -> glib::Source
94     where
95         F: FnMut(&Bus, &Message) -> Continue + Send + 'static,
96     {
97         skip_assert_initialized!();
98         unsafe {
99             let source = gst_sys::gst_bus_create_watch(self.to_glib_none().0);
100             glib_sys::g_source_set_callback(
101                 source,
102                 Some(transmute(trampoline_watch::<F> as usize)),
103                 into_raw_watch(func),
104                 Some(destroy_closure_watch::<F>),
105             );
106             glib_sys::g_source_set_priority(source, priority.to_glib());
107 
108             if let Some(name) = name {
109                 glib_sys::g_source_set_name(source, name.to_glib_none().0);
110             }
111 
112             from_glib_full(source)
113         }
114     }
115 
add_watch<F>(&self, func: F) -> Result<SourceId, glib::BoolError> where F: FnMut(&Bus, &Message) -> Continue + Send + 'static,116     pub fn add_watch<F>(&self, func: F) -> Result<SourceId, glib::BoolError>
117     where
118         F: FnMut(&Bus, &Message) -> Continue + Send + 'static,
119     {
120         unsafe {
121             let res = gst_sys::gst_bus_add_watch_full(
122                 self.to_glib_none().0,
123                 glib_sys::G_PRIORITY_DEFAULT,
124                 Some(trampoline_watch::<F>),
125                 into_raw_watch(func),
126                 Some(destroy_closure_watch::<F>),
127             );
128 
129             if res == 0 {
130                 Err(glib_bool_error!("Bus already has a watch"))
131             } else {
132                 Ok(from_glib(res))
133             }
134         }
135     }
136 
add_watch_local<F>(&self, func: F) -> Result<SourceId, glib::BoolError> where F: FnMut(&Bus, &Message) -> Continue + 'static,137     pub fn add_watch_local<F>(&self, func: F) -> Result<SourceId, glib::BoolError>
138     where
139         F: FnMut(&Bus, &Message) -> Continue + 'static,
140     {
141         unsafe {
142             assert!(glib::MainContext::ref_thread_default().is_owner());
143 
144             let res = gst_sys::gst_bus_add_watch_full(
145                 self.to_glib_none().0,
146                 glib_sys::G_PRIORITY_DEFAULT,
147                 Some(trampoline_watch::<F>),
148                 into_raw_watch(func),
149                 Some(destroy_closure_watch::<F>),
150             );
151 
152             if res == 0 {
153                 Err(glib_bool_error!("Bus already has a watch"))
154             } else {
155                 Ok(from_glib(res))
156             }
157         }
158     }
159 
set_sync_handler<F>(&self, func: F) where F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,160     pub fn set_sync_handler<F>(&self, func: F)
161     where
162         F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
163     {
164         unsafe {
165             let bus = self.to_glib_none().0;
166 
167             // This is not thread-safe before 1.16.3, see
168             // https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/merge_requests/416
169             if ::version() < (1, 16, 3, 0) {
170                 if !gobject_sys::g_object_get_qdata(bus as *mut _, SET_ONCE_QUARK.to_glib())
171                     .is_null()
172                 {
173                     panic!("Bus sync handler can only be set once");
174                 }
175 
176                 gobject_sys::g_object_set_qdata(
177                     bus as *mut _,
178                     SET_ONCE_QUARK.to_glib(),
179                     1 as *mut _,
180                 );
181             }
182 
183             gst_sys::gst_bus_set_sync_handler(
184                 bus,
185                 Some(trampoline_sync::<F>),
186                 into_raw_sync(func),
187                 Some(destroy_closure_sync::<F>),
188             )
189         }
190     }
191 
unset_sync_handler(&self)192     pub fn unset_sync_handler(&self) {
193         // This is not thread-safe before 1.16.3, see
194         // https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/merge_requests/416
195         if ::version() < (1, 16, 3, 0) {
196             return;
197         }
198 
199         unsafe {
200             use std::ptr;
201 
202             gst_sys::gst_bus_set_sync_handler(self.to_glib_none().0, None, ptr::null_mut(), None)
203         }
204     }
205 
iter(&self) -> Iter206     pub fn iter(&self) -> Iter {
207         self.iter_timed(0.into())
208     }
209 
iter_timed(&self, timeout: ::ClockTime) -> Iter210     pub fn iter_timed(&self, timeout: ::ClockTime) -> Iter {
211         Iter { bus: self, timeout }
212     }
213 
iter_filtered<'a>( &'a self, msg_types: &'a [MessageType], ) -> impl Iterator<Item = Message> + 'a214     pub fn iter_filtered<'a>(
215         &'a self,
216         msg_types: &'a [MessageType],
217     ) -> impl Iterator<Item = Message> + 'a {
218         self.iter_timed_filtered(0.into(), msg_types)
219     }
220 
iter_timed_filtered<'a>( &'a self, timeout: ::ClockTime, msg_types: &'a [MessageType], ) -> impl Iterator<Item = Message> + 'a221     pub fn iter_timed_filtered<'a>(
222         &'a self,
223         timeout: ::ClockTime,
224         msg_types: &'a [MessageType],
225     ) -> impl Iterator<Item = Message> + 'a {
226         self.iter_timed(timeout)
227             .filter(move |msg| msg_types.contains(&msg.get_type()))
228     }
229 
timed_pop_filtered( &self, timeout: ::ClockTime, msg_types: &[MessageType], ) -> Option<Message>230     pub fn timed_pop_filtered(
231         &self,
232         timeout: ::ClockTime,
233         msg_types: &[MessageType],
234     ) -> Option<Message> {
235         loop {
236             let msg = self.timed_pop(timeout)?;
237             if msg_types.contains(&msg.get_type()) {
238                 return Some(msg);
239             }
240         }
241     }
242 
pop_filtered(&self, msg_types: &[MessageType]) -> Option<Message>243     pub fn pop_filtered(&self, msg_types: &[MessageType]) -> Option<Message> {
244         loop {
245             let msg = self.pop()?;
246             if msg_types.contains(&msg.get_type()) {
247                 return Some(msg);
248             }
249         }
250     }
251 
stream(&self) -> BusStream252     pub fn stream(&self) -> BusStream {
253         BusStream::new(self)
254     }
255 
stream_filtered<'a>( &self, message_types: &'a [MessageType], ) -> impl Stream<Item = Message> + Unpin + Send + 'a256     pub fn stream_filtered<'a>(
257         &self,
258         message_types: &'a [MessageType],
259     ) -> impl Stream<Item = Message> + Unpin + Send + 'a {
260         self.stream().filter(move |message| {
261             let message_type = message.get_type();
262 
263             future::ready(message_types.contains(&message_type))
264         })
265     }
266 }
267 
268 #[derive(Debug)]
269 pub struct Iter<'a> {
270     bus: &'a Bus,
271     timeout: ::ClockTime,
272 }
273 
274 impl<'a> Iterator for Iter<'a> {
275     type Item = Message;
276 
next(&mut self) -> Option<Message>277     fn next(&mut self) -> Option<Message> {
278         self.bus.timed_pop(self.timeout)
279     }
280 }
281 
282 #[derive(Debug)]
283 pub struct BusStream {
284     bus: glib::WeakRef<Bus>,
285     receiver: UnboundedReceiver<Message>,
286 }
287 
288 impl BusStream {
new(bus: &Bus) -> Self289     pub fn new(bus: &Bus) -> Self {
290         skip_assert_initialized!();
291 
292         let (sender, receiver) = mpsc::unbounded();
293 
294         bus.set_sync_handler(move |_, message| {
295             let _ = sender.unbounded_send(message.to_owned());
296 
297             BusSyncReply::Drop
298         });
299 
300         Self {
301             bus: bus.downgrade(),
302             receiver,
303         }
304     }
305 }
306 
307 impl Drop for BusStream {
drop(&mut self)308     fn drop(&mut self) {
309         if let Some(bus) = self.bus.upgrade() {
310             bus.unset_sync_handler();
311         }
312     }
313 }
314 
315 impl Stream for BusStream {
316     type Item = Message;
317 
poll_next(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>>318     fn poll_next(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
319         self.receiver.poll_next_unpin(context)
320     }
321 }
322 
323 #[cfg(test)]
324 mod tests {
325     use super::*;
326     use std::sync::{Arc, Mutex};
327 
328     #[test]
test_sync_handler()329     fn test_sync_handler() {
330         ::init().unwrap();
331 
332         let bus = Bus::new();
333         let msgs = Arc::new(Mutex::new(Vec::new()));
334         let msgs_clone = msgs.clone();
335         bus.set_sync_handler(move |_, msg| {
336             msgs_clone.lock().unwrap().push(msg.clone());
337             BusSyncReply::Pass
338         });
339 
340         bus.post(&::Message::new_eos().build()).unwrap();
341 
342         let msgs = msgs.lock().unwrap();
343         assert_eq!(msgs.len(), 1);
344         match msgs[0].view() {
345             ::MessageView::Eos(_) => (),
346             _ => unreachable!(),
347         }
348     }
349 
350     #[test]
test_bus_stream()351     fn test_bus_stream() {
352         ::init().unwrap();
353 
354         let bus = Bus::new();
355         let bus_stream = bus.stream();
356 
357         let eos_message = ::Message::new_eos().build();
358         bus.post(&eos_message).unwrap();
359 
360         let bus_future = bus_stream.into_future();
361         let (message, _) = futures_executor::block_on(bus_future);
362 
363         match message.unwrap().view() {
364             ::MessageView::Eos(_) => (),
365             _ => unreachable!(),
366         }
367     }
368 }
369