1 // Take a look at the license at the top of the repository in the LICENSE file.
2
3 use futures_channel::mpsc::{self, UnboundedReceiver};
4 use futures_core::Stream;
5 use futures_util::StreamExt;
6 use glib::ffi::{gboolean, gpointer};
7 use glib::prelude::*;
8 use glib::source::{Continue, Priority, SourceId};
9 use glib::translate::*;
10 use std::cell::RefCell;
11 use std::future;
12 use std::mem::transmute;
13 use std::pin::Pin;
14 use std::task::{Context, Poll};
15
16 use crate::Bus;
17 use crate::BusSyncReply;
18 use crate::Message;
19 use crate::MessageType;
20
trampoline_watch<F: FnMut(&Bus, &Message) -> Continue + 'static>( bus: *mut ffi::GstBus, msg: *mut ffi::GstMessage, func: gpointer, ) -> gboolean21 unsafe extern "C" fn trampoline_watch<F: FnMut(&Bus, &Message) -> Continue + 'static>(
22 bus: *mut ffi::GstBus,
23 msg: *mut ffi::GstMessage,
24 func: gpointer,
25 ) -> gboolean {
26 let func: &RefCell<F> = &*(func as *const RefCell<F>);
27 (&mut *func.borrow_mut())(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib()
28 }
29
destroy_closure_watch<F: FnMut(&Bus, &Message) -> Continue + 'static>( ptr: gpointer, )30 unsafe extern "C" fn destroy_closure_watch<F: FnMut(&Bus, &Message) -> Continue + 'static>(
31 ptr: gpointer,
32 ) {
33 Box::<RefCell<F>>::from_raw(ptr as *mut _);
34 }
35
into_raw_watch<F: FnMut(&Bus, &Message) -> Continue + 'static>(func: F) -> gpointer36 fn into_raw_watch<F: FnMut(&Bus, &Message) -> Continue + 'static>(func: F) -> gpointer {
37 #[allow(clippy::type_complexity)]
38 let func: Box<RefCell<F>> = Box::new(RefCell::new(func));
39 Box::into_raw(func) as gpointer
40 }
41
trampoline_sync< F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static, >( bus: *mut ffi::GstBus, msg: *mut ffi::GstMessage, func: gpointer, ) -> ffi::GstBusSyncReply42 unsafe extern "C" fn trampoline_sync<
43 F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
44 >(
45 bus: *mut ffi::GstBus,
46 msg: *mut ffi::GstMessage,
47 func: gpointer,
48 ) -> ffi::GstBusSyncReply {
49 let f: &F = &*(func as *const F);
50 let res = f(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).into_glib();
51
52 if res == ffi::GST_BUS_DROP {
53 ffi::gst_mini_object_unref(msg as *mut _);
54 }
55
56 res
57 }
58
destroy_closure_sync< F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static, >( ptr: gpointer, )59 unsafe extern "C" fn destroy_closure_sync<
60 F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
61 >(
62 ptr: gpointer,
63 ) {
64 Box::<F>::from_raw(ptr as *mut _);
65 }
66
into_raw_sync<F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static>( func: F, ) -> gpointer67 fn into_raw_sync<F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static>(
68 func: F,
69 ) -> gpointer {
70 let func: Box<F> = Box::new(func);
71 Box::into_raw(func) as gpointer
72 }
73
74 impl Bus {
75 #[doc(alias = "gst_bus_add_signal_watch_full")]
add_signal_watch_full(&self, priority: Priority)76 pub fn add_signal_watch_full(&self, priority: Priority) {
77 unsafe {
78 ffi::gst_bus_add_signal_watch_full(self.to_glib_none().0, priority.into_glib());
79 }
80 }
81
82 #[doc(alias = "gst_bus_create_watch")]
create_watch<F>(&self, name: Option<&str>, priority: Priority, func: F) -> glib::Source where F: FnMut(&Bus, &Message) -> Continue + Send + 'static,83 pub fn create_watch<F>(&self, name: Option<&str>, priority: Priority, func: F) -> glib::Source
84 where
85 F: FnMut(&Bus, &Message) -> Continue + Send + 'static,
86 {
87 skip_assert_initialized!();
88 unsafe {
89 let source = ffi::gst_bus_create_watch(self.to_glib_none().0);
90 glib::ffi::g_source_set_callback(
91 source,
92 Some(transmute::<
93 _,
94 unsafe extern "C" fn(glib::ffi::gpointer) -> i32,
95 >(trampoline_watch::<F> as *const ())),
96 into_raw_watch(func),
97 Some(destroy_closure_watch::<F>),
98 );
99 glib::ffi::g_source_set_priority(source, priority.into_glib());
100
101 if let Some(name) = name {
102 glib::ffi::g_source_set_name(source, name.to_glib_none().0);
103 }
104
105 from_glib_full(source)
106 }
107 }
108
109 #[doc(alias = "gst_bus_add_watch_full")]
add_watch<F>(&self, func: F) -> Result<SourceId, glib::BoolError> where F: FnMut(&Bus, &Message) -> Continue + Send + 'static,110 pub fn add_watch<F>(&self, func: F) -> Result<SourceId, glib::BoolError>
111 where
112 F: FnMut(&Bus, &Message) -> Continue + Send + 'static,
113 {
114 unsafe {
115 let res = ffi::gst_bus_add_watch_full(
116 self.to_glib_none().0,
117 glib::ffi::G_PRIORITY_DEFAULT,
118 Some(trampoline_watch::<F>),
119 into_raw_watch(func),
120 Some(destroy_closure_watch::<F>),
121 );
122
123 if res == 0 {
124 Err(glib::bool_error!("Bus already has a watch"))
125 } else {
126 Ok(from_glib(res))
127 }
128 }
129 }
130
add_watch_local<F>(&self, func: F) -> Result<SourceId, glib::BoolError> where F: FnMut(&Bus, &Message) -> Continue + 'static,131 pub fn add_watch_local<F>(&self, func: F) -> Result<SourceId, glib::BoolError>
132 where
133 F: FnMut(&Bus, &Message) -> Continue + 'static,
134 {
135 unsafe {
136 assert!(glib::MainContext::ref_thread_default().is_owner());
137
138 let res = ffi::gst_bus_add_watch_full(
139 self.to_glib_none().0,
140 glib::ffi::G_PRIORITY_DEFAULT,
141 Some(trampoline_watch::<F>),
142 into_raw_watch(func),
143 Some(destroy_closure_watch::<F>),
144 );
145
146 if res == 0 {
147 Err(glib::bool_error!("Bus already has a watch"))
148 } else {
149 Ok(from_glib(res))
150 }
151 }
152 }
153
154 #[doc(alias = "gst_bus_set_sync_handler")]
set_sync_handler<F>(&self, func: F) where F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,155 pub fn set_sync_handler<F>(&self, func: F)
156 where
157 F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
158 {
159 use once_cell::sync::Lazy;
160 static SET_ONCE_QUARK: Lazy<glib::Quark> =
161 Lazy::new(|| glib::Quark::from_string("gstreamer-rs-sync-handler"));
162
163 unsafe {
164 let bus = self.to_glib_none().0;
165
166 // This is not thread-safe before 1.16.3, see
167 // https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/merge_requests/416
168 if crate::version() < (1, 16, 3, 0) {
169 if !glib::gobject_ffi::g_object_get_qdata(bus as *mut _, SET_ONCE_QUARK.into_glib())
170 .is_null()
171 {
172 panic!("Bus sync handler can only be set once");
173 }
174
175 glib::gobject_ffi::g_object_set_qdata(
176 bus as *mut _,
177 SET_ONCE_QUARK.into_glib(),
178 1 as *mut _,
179 );
180 }
181
182 ffi::gst_bus_set_sync_handler(
183 bus,
184 Some(trampoline_sync::<F>),
185 into_raw_sync(func),
186 Some(destroy_closure_sync::<F>),
187 )
188 }
189 }
190
unset_sync_handler(&self)191 pub fn unset_sync_handler(&self) {
192 // This is not thread-safe before 1.16.3, see
193 // https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/merge_requests/416
194 if crate::version() < (1, 16, 3, 0) {
195 return;
196 }
197
198 unsafe {
199 use std::ptr;
200
201 ffi::gst_bus_set_sync_handler(self.to_glib_none().0, None, ptr::null_mut(), None)
202 }
203 }
204
iter(&self) -> Iter205 pub fn iter(&self) -> Iter {
206 self.iter_timed(Some(crate::ClockTime::ZERO))
207 }
208
iter_timed(&self, timeout: impl Into<Option<crate::ClockTime>>) -> Iter209 pub fn iter_timed(&self, timeout: impl Into<Option<crate::ClockTime>>) -> Iter {
210 Iter {
211 bus: self,
212 timeout: timeout.into(),
213 }
214 }
215
iter_filtered<'a>( &'a self, msg_types: &'a [MessageType], ) -> impl Iterator<Item = Message> + 'a216 pub fn iter_filtered<'a>(
217 &'a self,
218 msg_types: &'a [MessageType],
219 ) -> impl Iterator<Item = Message> + 'a {
220 self.iter_timed_filtered(Some(crate::ClockTime::ZERO), msg_types)
221 }
222
iter_timed_filtered<'a>( &'a self, timeout: impl Into<Option<crate::ClockTime>>, msg_types: &'a [MessageType], ) -> impl Iterator<Item = Message> + 'a223 pub fn iter_timed_filtered<'a>(
224 &'a self,
225 timeout: impl Into<Option<crate::ClockTime>>,
226 msg_types: &'a [MessageType],
227 ) -> impl Iterator<Item = Message> + 'a {
228 self.iter_timed(timeout)
229 .filter(move |msg| msg_types.contains(&msg.type_()))
230 }
231
timed_pop_filtered( &self, timeout: impl Into<Option<crate::ClockTime>> + Clone, msg_types: &[MessageType], ) -> Option<Message>232 pub fn timed_pop_filtered(
233 &self,
234 timeout: impl Into<Option<crate::ClockTime>> + Clone,
235 msg_types: &[MessageType],
236 ) -> Option<Message> {
237 loop {
238 let msg = self.timed_pop(timeout.clone())?;
239 if msg_types.contains(&msg.type_()) {
240 return Some(msg);
241 }
242 }
243 }
244
pop_filtered(&self, msg_types: &[MessageType]) -> Option<Message>245 pub fn pop_filtered(&self, msg_types: &[MessageType]) -> Option<Message> {
246 loop {
247 let msg = self.pop()?;
248 if msg_types.contains(&msg.type_()) {
249 return Some(msg);
250 }
251 }
252 }
253
stream(&self) -> BusStream254 pub fn stream(&self) -> BusStream {
255 BusStream::new(self)
256 }
257
stream_filtered<'a>( &self, message_types: &'a [MessageType], ) -> impl Stream<Item = Message> + Unpin + Send + 'a258 pub fn stream_filtered<'a>(
259 &self,
260 message_types: &'a [MessageType],
261 ) -> impl Stream<Item = Message> + Unpin + Send + 'a {
262 self.stream().filter(move |message| {
263 let message_type = message.type_();
264
265 future::ready(message_types.contains(&message_type))
266 })
267 }
268 }
269
270 #[derive(Debug)]
271 pub struct Iter<'a> {
272 bus: &'a Bus,
273 timeout: Option<crate::ClockTime>,
274 }
275
276 impl<'a> Iterator for Iter<'a> {
277 type Item = Message;
278
next(&mut self) -> Option<Message>279 fn next(&mut self) -> Option<Message> {
280 self.bus.timed_pop(self.timeout)
281 }
282 }
283
284 #[derive(Debug)]
285 pub struct BusStream {
286 bus: glib::WeakRef<Bus>,
287 receiver: UnboundedReceiver<Message>,
288 }
289
290 impl BusStream {
new(bus: &Bus) -> Self291 fn new(bus: &Bus) -> Self {
292 skip_assert_initialized!();
293
294 let (sender, receiver) = mpsc::unbounded();
295
296 bus.set_sync_handler(move |_, message| {
297 let _ = sender.unbounded_send(message.to_owned());
298
299 BusSyncReply::Drop
300 });
301
302 Self {
303 bus: bus.downgrade(),
304 receiver,
305 }
306 }
307 }
308
309 impl Drop for BusStream {
drop(&mut self)310 fn drop(&mut self) {
311 if let Some(bus) = self.bus.upgrade() {
312 bus.unset_sync_handler();
313 }
314 }
315 }
316
317 impl Stream for BusStream {
318 type Item = Message;
319
poll_next(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>>320 fn poll_next(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
321 self.receiver.poll_next_unpin(context)
322 }
323 }
324
325 #[cfg(test)]
326 mod tests {
327 use super::*;
328 use std::sync::{Arc, Mutex};
329
330 #[test]
test_sync_handler()331 fn test_sync_handler() {
332 crate::init().unwrap();
333
334 let bus = Bus::new();
335 let msgs = Arc::new(Mutex::new(Vec::new()));
336 let msgs_clone = msgs.clone();
337 bus.set_sync_handler(move |_, msg| {
338 msgs_clone.lock().unwrap().push(msg.clone());
339 BusSyncReply::Pass
340 });
341
342 bus.post(&crate::message::Eos::new()).unwrap();
343
344 let msgs = msgs.lock().unwrap();
345 assert_eq!(msgs.len(), 1);
346 match msgs[0].view() {
347 crate::MessageView::Eos(_) => (),
348 _ => unreachable!(),
349 }
350 }
351
352 #[test]
test_bus_stream()353 fn test_bus_stream() {
354 crate::init().unwrap();
355
356 let bus = Bus::new();
357 let bus_stream = bus.stream();
358
359 let eos_message = crate::message::Eos::new();
360 bus.post(&eos_message).unwrap();
361
362 let bus_future = bus_stream.into_future();
363 let (message, _) = futures_executor::block_on(bus_future);
364
365 match message.unwrap().view() {
366 crate::MessageView::Eos(_) => (),
367 _ => unreachable!(),
368 }
369 }
370 }
371