1 // Copyright (C) 2017 Sebastian Dröge <sebastian@centricular.com>
2 //
3 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
4 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
5 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
6 // option. This file may not be copied, modified, or distributed
7 // except according to those terms.
8
9 use futures_channel::mpsc::{self, UnboundedReceiver};
10 use futures_core::Stream;
11 use futures_util::{future, StreamExt};
12 use glib;
13 use glib::prelude::*;
14 use glib::source::{Continue, Priority, SourceId};
15 use glib::translate::*;
16 use glib_sys;
17 use glib_sys::{gboolean, gpointer};
18 use gst_sys;
19 use std::cell::RefCell;
20 use std::mem::transmute;
21 use std::pin::Pin;
22 use std::task::{Context, Poll};
23
24 use Bus;
25 use BusSyncReply;
26 use Message;
27 use MessageType;
28
29 lazy_static! {
30 static ref SET_ONCE_QUARK: glib::Quark = glib::Quark::from_string("gstreamer-rs-sync-handler");
31 }
32
trampoline_watch<F: FnMut(&Bus, &Message) -> Continue + 'static>( bus: *mut gst_sys::GstBus, msg: *mut gst_sys::GstMessage, func: gpointer, ) -> gboolean33 unsafe extern "C" fn trampoline_watch<F: FnMut(&Bus, &Message) -> Continue + 'static>(
34 bus: *mut gst_sys::GstBus,
35 msg: *mut gst_sys::GstMessage,
36 func: gpointer,
37 ) -> gboolean {
38 let func: &RefCell<F> = &*(func as *const RefCell<F>);
39 (&mut *func.borrow_mut())(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).to_glib()
40 }
41
destroy_closure_watch<F: FnMut(&Bus, &Message) -> Continue + 'static>( ptr: gpointer, )42 unsafe extern "C" fn destroy_closure_watch<F: FnMut(&Bus, &Message) -> Continue + 'static>(
43 ptr: gpointer,
44 ) {
45 Box::<RefCell<F>>::from_raw(ptr as *mut _);
46 }
47
into_raw_watch<F: FnMut(&Bus, &Message) -> Continue + 'static>(func: F) -> gpointer48 fn into_raw_watch<F: FnMut(&Bus, &Message) -> Continue + 'static>(func: F) -> gpointer {
49 #[allow(clippy::type_complexity)]
50 let func: Box<RefCell<F>> = Box::new(RefCell::new(func));
51 Box::into_raw(func) as gpointer
52 }
53
trampoline_sync< F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static, >( bus: *mut gst_sys::GstBus, msg: *mut gst_sys::GstMessage, func: gpointer, ) -> gst_sys::GstBusSyncReply54 unsafe extern "C" fn trampoline_sync<
55 F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
56 >(
57 bus: *mut gst_sys::GstBus,
58 msg: *mut gst_sys::GstMessage,
59 func: gpointer,
60 ) -> gst_sys::GstBusSyncReply {
61 let f: &F = &*(func as *const F);
62 let res = f(&from_glib_borrow(bus), &Message::from_glib_borrow(msg)).to_glib();
63
64 if res == gst_sys::GST_BUS_DROP {
65 gst_sys::gst_mini_object_unref(msg as *mut _);
66 }
67
68 res
69 }
70
destroy_closure_sync< F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static, >( ptr: gpointer, )71 unsafe extern "C" fn destroy_closure_sync<
72 F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
73 >(
74 ptr: gpointer,
75 ) {
76 Box::<F>::from_raw(ptr as *mut _);
77 }
78
into_raw_sync<F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static>( func: F, ) -> gpointer79 fn into_raw_sync<F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static>(
80 func: F,
81 ) -> gpointer {
82 let func: Box<F> = Box::new(func);
83 Box::into_raw(func) as gpointer
84 }
85
86 impl Bus {
add_signal_watch_full(&self, priority: Priority)87 pub fn add_signal_watch_full(&self, priority: Priority) {
88 unsafe {
89 gst_sys::gst_bus_add_signal_watch_full(self.to_glib_none().0, priority.to_glib());
90 }
91 }
92
create_watch<F>(&self, name: Option<&str>, priority: Priority, func: F) -> glib::Source where F: FnMut(&Bus, &Message) -> Continue + Send + 'static,93 pub fn create_watch<F>(&self, name: Option<&str>, priority: Priority, func: F) -> glib::Source
94 where
95 F: FnMut(&Bus, &Message) -> Continue + Send + 'static,
96 {
97 skip_assert_initialized!();
98 unsafe {
99 let source = gst_sys::gst_bus_create_watch(self.to_glib_none().0);
100 glib_sys::g_source_set_callback(
101 source,
102 Some(transmute(trampoline_watch::<F> as usize)),
103 into_raw_watch(func),
104 Some(destroy_closure_watch::<F>),
105 );
106 glib_sys::g_source_set_priority(source, priority.to_glib());
107
108 if let Some(name) = name {
109 glib_sys::g_source_set_name(source, name.to_glib_none().0);
110 }
111
112 from_glib_full(source)
113 }
114 }
115
add_watch<F>(&self, func: F) -> Result<SourceId, glib::BoolError> where F: FnMut(&Bus, &Message) -> Continue + Send + 'static,116 pub fn add_watch<F>(&self, func: F) -> Result<SourceId, glib::BoolError>
117 where
118 F: FnMut(&Bus, &Message) -> Continue + Send + 'static,
119 {
120 unsafe {
121 let res = gst_sys::gst_bus_add_watch_full(
122 self.to_glib_none().0,
123 glib_sys::G_PRIORITY_DEFAULT,
124 Some(trampoline_watch::<F>),
125 into_raw_watch(func),
126 Some(destroy_closure_watch::<F>),
127 );
128
129 if res == 0 {
130 Err(glib_bool_error!("Bus already has a watch"))
131 } else {
132 Ok(from_glib(res))
133 }
134 }
135 }
136
add_watch_local<F>(&self, func: F) -> Result<SourceId, glib::BoolError> where F: FnMut(&Bus, &Message) -> Continue + 'static,137 pub fn add_watch_local<F>(&self, func: F) -> Result<SourceId, glib::BoolError>
138 where
139 F: FnMut(&Bus, &Message) -> Continue + 'static,
140 {
141 unsafe {
142 assert!(glib::MainContext::ref_thread_default().is_owner());
143
144 let res = gst_sys::gst_bus_add_watch_full(
145 self.to_glib_none().0,
146 glib_sys::G_PRIORITY_DEFAULT,
147 Some(trampoline_watch::<F>),
148 into_raw_watch(func),
149 Some(destroy_closure_watch::<F>),
150 );
151
152 if res == 0 {
153 Err(glib_bool_error!("Bus already has a watch"))
154 } else {
155 Ok(from_glib(res))
156 }
157 }
158 }
159
set_sync_handler<F>(&self, func: F) where F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,160 pub fn set_sync_handler<F>(&self, func: F)
161 where
162 F: Fn(&Bus, &Message) -> BusSyncReply + Send + Sync + 'static,
163 {
164 unsafe {
165 let bus = self.to_glib_none().0;
166
167 // This is not thread-safe before 1.16.3, see
168 // https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/merge_requests/416
169 if ::version() < (1, 16, 3, 0) {
170 if !gobject_sys::g_object_get_qdata(bus as *mut _, SET_ONCE_QUARK.to_glib())
171 .is_null()
172 {
173 panic!("Bus sync handler can only be set once");
174 }
175
176 gobject_sys::g_object_set_qdata(
177 bus as *mut _,
178 SET_ONCE_QUARK.to_glib(),
179 1 as *mut _,
180 );
181 }
182
183 gst_sys::gst_bus_set_sync_handler(
184 bus,
185 Some(trampoline_sync::<F>),
186 into_raw_sync(func),
187 Some(destroy_closure_sync::<F>),
188 )
189 }
190 }
191
unset_sync_handler(&self)192 pub fn unset_sync_handler(&self) {
193 // This is not thread-safe before 1.16.3, see
194 // https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/merge_requests/416
195 if ::version() < (1, 16, 3, 0) {
196 return;
197 }
198
199 unsafe {
200 use std::ptr;
201
202 gst_sys::gst_bus_set_sync_handler(self.to_glib_none().0, None, ptr::null_mut(), None)
203 }
204 }
205
iter(&self) -> Iter206 pub fn iter(&self) -> Iter {
207 self.iter_timed(0.into())
208 }
209
iter_timed(&self, timeout: ::ClockTime) -> Iter210 pub fn iter_timed(&self, timeout: ::ClockTime) -> Iter {
211 Iter { bus: self, timeout }
212 }
213
iter_filtered<'a>( &'a self, msg_types: &'a [MessageType], ) -> impl Iterator<Item = Message> + 'a214 pub fn iter_filtered<'a>(
215 &'a self,
216 msg_types: &'a [MessageType],
217 ) -> impl Iterator<Item = Message> + 'a {
218 self.iter_timed_filtered(0.into(), msg_types)
219 }
220
iter_timed_filtered<'a>( &'a self, timeout: ::ClockTime, msg_types: &'a [MessageType], ) -> impl Iterator<Item = Message> + 'a221 pub fn iter_timed_filtered<'a>(
222 &'a self,
223 timeout: ::ClockTime,
224 msg_types: &'a [MessageType],
225 ) -> impl Iterator<Item = Message> + 'a {
226 self.iter_timed(timeout)
227 .filter(move |msg| msg_types.contains(&msg.get_type()))
228 }
229
timed_pop_filtered( &self, timeout: ::ClockTime, msg_types: &[MessageType], ) -> Option<Message>230 pub fn timed_pop_filtered(
231 &self,
232 timeout: ::ClockTime,
233 msg_types: &[MessageType],
234 ) -> Option<Message> {
235 loop {
236 let msg = self.timed_pop(timeout)?;
237 if msg_types.contains(&msg.get_type()) {
238 return Some(msg);
239 }
240 }
241 }
242
pop_filtered(&self, msg_types: &[MessageType]) -> Option<Message>243 pub fn pop_filtered(&self, msg_types: &[MessageType]) -> Option<Message> {
244 loop {
245 let msg = self.pop()?;
246 if msg_types.contains(&msg.get_type()) {
247 return Some(msg);
248 }
249 }
250 }
251
stream(&self) -> BusStream252 pub fn stream(&self) -> BusStream {
253 BusStream::new(self)
254 }
255
stream_filtered<'a>( &self, message_types: &'a [MessageType], ) -> impl Stream<Item = Message> + Unpin + Send + 'a256 pub fn stream_filtered<'a>(
257 &self,
258 message_types: &'a [MessageType],
259 ) -> impl Stream<Item = Message> + Unpin + Send + 'a {
260 self.stream().filter(move |message| {
261 let message_type = message.get_type();
262
263 future::ready(message_types.contains(&message_type))
264 })
265 }
266 }
267
268 #[derive(Debug)]
269 pub struct Iter<'a> {
270 bus: &'a Bus,
271 timeout: ::ClockTime,
272 }
273
274 impl<'a> Iterator for Iter<'a> {
275 type Item = Message;
276
next(&mut self) -> Option<Message>277 fn next(&mut self) -> Option<Message> {
278 self.bus.timed_pop(self.timeout)
279 }
280 }
281
282 #[derive(Debug)]
283 pub struct BusStream {
284 bus: glib::WeakRef<Bus>,
285 receiver: UnboundedReceiver<Message>,
286 }
287
288 impl BusStream {
new(bus: &Bus) -> Self289 pub fn new(bus: &Bus) -> Self {
290 skip_assert_initialized!();
291
292 let (sender, receiver) = mpsc::unbounded();
293
294 bus.set_sync_handler(move |_, message| {
295 let _ = sender.unbounded_send(message.to_owned());
296
297 BusSyncReply::Drop
298 });
299
300 Self {
301 bus: bus.downgrade(),
302 receiver,
303 }
304 }
305 }
306
307 impl Drop for BusStream {
drop(&mut self)308 fn drop(&mut self) {
309 if let Some(bus) = self.bus.upgrade() {
310 bus.unset_sync_handler();
311 }
312 }
313 }
314
315 impl Stream for BusStream {
316 type Item = Message;
317
poll_next(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>>318 fn poll_next(mut self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
319 self.receiver.poll_next_unpin(context)
320 }
321 }
322
323 #[cfg(test)]
324 mod tests {
325 use super::*;
326 use std::sync::{Arc, Mutex};
327
328 #[test]
test_sync_handler()329 fn test_sync_handler() {
330 ::init().unwrap();
331
332 let bus = Bus::new();
333 let msgs = Arc::new(Mutex::new(Vec::new()));
334 let msgs_clone = msgs.clone();
335 bus.set_sync_handler(move |_, msg| {
336 msgs_clone.lock().unwrap().push(msg.clone());
337 BusSyncReply::Pass
338 });
339
340 bus.post(&::Message::new_eos().build()).unwrap();
341
342 let msgs = msgs.lock().unwrap();
343 assert_eq!(msgs.len(), 1);
344 match msgs[0].view() {
345 ::MessageView::Eos(_) => (),
346 _ => unreachable!(),
347 }
348 }
349
350 #[test]
test_bus_stream()351 fn test_bus_stream() {
352 ::init().unwrap();
353
354 let bus = Bus::new();
355 let bus_stream = bus.stream();
356
357 let eos_message = ::Message::new_eos().build();
358 bus.post(&eos_message).unwrap();
359
360 let bus_future = bus_stream.into_future();
361 let (message, _) = futures_executor::block_on(bus_future);
362
363 match message.unwrap().view() {
364 ::MessageView::Eos(_) => (),
365 _ => unreachable!(),
366 }
367 }
368 }
369