1 // Take a look at the license at the top of the repository in the LICENSE file. 2 3 use crate::Clock; 4 use crate::ClockEntryType; 5 use crate::ClockError; 6 use crate::ClockFlags; 7 use crate::ClockReturn; 8 use crate::ClockSuccess; 9 use crate::ClockTime; 10 use crate::ClockTimeDiff; 11 use glib::ffi::{gboolean, gpointer}; 12 use glib::prelude::*; 13 use glib::translate::*; 14 use libc::c_void; 15 use std::cmp; 16 use std::convert; 17 use std::ptr; 18 19 use futures_core::{Future, Stream}; 20 use std::marker::Unpin; 21 use std::pin::Pin; 22 use std::sync::atomic; 23 use std::sync::atomic::AtomicI32; 24 25 glib::wrapper! { 26 #[derive(Debug, PartialOrd, Ord, PartialEq, Eq, Hash)] 27 pub struct ClockId(Shared<c_void>); 28 29 match fn { 30 ref => |ptr| ffi::gst_clock_id_ref(ptr), 31 unref => |ptr| ffi::gst_clock_id_unref(ptr), 32 } 33 } 34 35 impl ClockId { 36 #[doc(alias = "get_time")] 37 #[doc(alias = "gst_clock_id_get_time")] time(&self) -> ClockTime38 pub fn time(&self) -> ClockTime { 39 unsafe { 40 try_from_glib(ffi::gst_clock_id_get_time(self.to_glib_none().0)) 41 .expect("undefined time") 42 } 43 } 44 45 #[doc(alias = "gst_clock_id_unschedule")] unschedule(&self)46 pub fn unschedule(&self) { 47 unsafe { ffi::gst_clock_id_unschedule(self.to_glib_none().0) } 48 } 49 50 #[doc(alias = "gst_clock_id_wait")] wait(&self) -> (Result<ClockSuccess, ClockError>, ClockTimeDiff)51 pub fn wait(&self) -> (Result<ClockSuccess, ClockError>, ClockTimeDiff) { 52 unsafe { 53 let mut jitter = 0; 54 let res = try_from_glib(ffi::gst_clock_id_wait(self.to_glib_none().0, &mut jitter)); 55 (res, jitter) 56 } 57 } 58 compare_by_time(&self, other: &Self) -> cmp::Ordering59 pub fn compare_by_time(&self, other: &Self) -> cmp::Ordering { 60 unsafe { 61 let res = ffi::gst_clock_id_compare_func(self.to_glib_none().0, other.to_glib_none().0); 62 res.cmp(&0) 63 } 64 } 65 66 #[cfg(any(feature = "v1_16", feature = "dox"))] 67 #[cfg_attr(feature = "dox", doc(cfg(feature = "v1_16")))] 68 #[doc(alias = "get_clock")] 69 #[doc(alias = "gst_clock_id_get_clock")] clock(&self) -> Option<Clock>70 pub fn clock(&self) -> Option<Clock> { 71 unsafe { from_glib_full(ffi::gst_clock_id_get_clock(self.to_glib_none().0)) } 72 } 73 74 #[cfg(any(feature = "v1_16", feature = "dox"))] 75 #[cfg_attr(feature = "dox", doc(cfg(feature = "v1_16")))] 76 #[doc(alias = "gst_clock_id_uses_clock")] uses_clock<P: IsA<Clock>>(&self, clock: &P) -> bool77 pub fn uses_clock<P: IsA<Clock>>(&self, clock: &P) -> bool { 78 unsafe { 79 from_glib(ffi::gst_clock_id_uses_clock( 80 self.to_glib_none().0, 81 clock.as_ref().as_ptr(), 82 )) 83 } 84 } 85 86 #[doc(alias = "get_type")] type_(&self) -> ClockEntryType87 pub fn type_(&self) -> ClockEntryType { 88 unsafe { 89 let ptr: *mut ffi::GstClockEntry = self.to_glib_none().0 as *mut _; 90 from_glib((*ptr).type_) 91 } 92 } 93 94 #[doc(alias = "get_status")] status(&self) -> &AtomicClockReturn95 pub fn status(&self) -> &AtomicClockReturn { 96 unsafe { 97 let ptr: *mut ffi::GstClockEntry = self.to_glib_none().0 as *mut _; 98 &*((&(*ptr).status) as *const i32 as *const AtomicClockReturn) 99 } 100 } 101 } 102 103 #[derive(Clone, Debug, PartialOrd, Ord, PartialEq, Eq, Hash)] 104 pub struct SingleShotClockId(ClockId); 105 106 impl std::ops::Deref for SingleShotClockId { 107 type Target = ClockId; 108 deref(&self) -> &Self::Target109 fn deref(&self) -> &Self::Target { 110 &self.0 111 } 112 } 113 114 impl From<SingleShotClockId> for ClockId { from(id: SingleShotClockId) -> ClockId115 fn from(id: SingleShotClockId) -> ClockId { 116 skip_assert_initialized!(); 117 id.0 118 } 119 } 120 121 impl convert::TryFrom<ClockId> for SingleShotClockId { 122 type Error = glib::BoolError; 123 try_from(id: ClockId) -> Result<SingleShotClockId, glib::BoolError>124 fn try_from(id: ClockId) -> Result<SingleShotClockId, glib::BoolError> { 125 skip_assert_initialized!(); 126 match id.type_() { 127 ClockEntryType::Single => Ok(SingleShotClockId(id)), 128 _ => Err(glib::bool_error!("Not a single-shot clock id")), 129 } 130 } 131 } 132 133 impl SingleShotClockId { compare_by_time(&self, other: &Self) -> cmp::Ordering134 pub fn compare_by_time(&self, other: &Self) -> cmp::Ordering { 135 self.0.compare_by_time(&other.0) 136 } 137 138 #[doc(alias = "gst_clock_id_wait_async")] wait_async<F>(&self, func: F) -> Result<ClockSuccess, ClockError> where F: FnOnce(&Clock, Option<ClockTime>, &ClockId) + Send + 'static,139 pub fn wait_async<F>(&self, func: F) -> Result<ClockSuccess, ClockError> 140 where 141 F: FnOnce(&Clock, Option<ClockTime>, &ClockId) + Send + 'static, 142 { 143 unsafe extern "C" fn trampoline< 144 F: FnOnce(&Clock, Option<ClockTime>, &ClockId) + Send + 'static, 145 >( 146 clock: *mut ffi::GstClock, 147 time: ffi::GstClockTime, 148 id: gpointer, 149 func: gpointer, 150 ) -> gboolean { 151 let f: &mut Option<F> = &mut *(func as *mut Option<F>); 152 let f = f.take().unwrap(); 153 154 f( 155 &from_glib_borrow(clock), 156 from_glib(time), 157 &from_glib_borrow(id), 158 ); 159 160 glib::ffi::GTRUE 161 } 162 163 unsafe extern "C" fn destroy_notify< 164 F: FnOnce(&Clock, Option<ClockTime>, &ClockId) + Send + 'static, 165 >( 166 ptr: gpointer, 167 ) { 168 Box::<Option<F>>::from_raw(ptr as *mut _); 169 } 170 171 let func: Box<Option<F>> = Box::new(Some(func)); 172 173 unsafe { 174 try_from_glib(ffi::gst_clock_id_wait_async( 175 self.to_glib_none().0, 176 Some(trampoline::<F>), 177 Box::into_raw(func) as gpointer, 178 Some(destroy_notify::<F>), 179 )) 180 } 181 } 182 183 #[allow(clippy::type_complexity)] wait_async_future( &self, ) -> Result< Pin< Box< dyn Future<Output = Result<(Option<ClockTime>, ClockId), ClockError>> + Send + 'static, >, >, ClockError, >184 pub fn wait_async_future( 185 &self, 186 ) -> Result< 187 Pin< 188 Box< 189 dyn Future<Output = Result<(Option<ClockTime>, ClockId), ClockError>> 190 + Send 191 + 'static, 192 >, 193 >, 194 ClockError, 195 > { 196 use futures_channel::oneshot; 197 198 let (sender, receiver) = oneshot::channel(); 199 200 self.wait_async(move |_clock, jitter, id| { 201 if sender.send((jitter, id.clone())).is_err() { 202 // Unschedule any future calls if the receiver end is disconnected 203 id.unschedule(); 204 } 205 })?; 206 207 Ok(Box::pin(async move { 208 receiver.await.map_err(|_| ClockError::Unscheduled) 209 })) 210 } 211 } 212 213 #[derive(Debug, PartialOrd, Ord, PartialEq, Eq, Hash)] 214 pub struct PeriodicClockId(ClockId); 215 216 impl std::ops::Deref for PeriodicClockId { 217 type Target = ClockId; 218 deref(&self) -> &Self::Target219 fn deref(&self) -> &Self::Target { 220 &self.0 221 } 222 } 223 224 impl From<PeriodicClockId> for ClockId { from(id: PeriodicClockId) -> ClockId225 fn from(id: PeriodicClockId) -> ClockId { 226 skip_assert_initialized!(); 227 id.0 228 } 229 } 230 231 impl convert::TryFrom<ClockId> for PeriodicClockId { 232 type Error = glib::BoolError; 233 try_from(id: ClockId) -> Result<PeriodicClockId, glib::BoolError>234 fn try_from(id: ClockId) -> Result<PeriodicClockId, glib::BoolError> { 235 skip_assert_initialized!(); 236 match id.type_() { 237 ClockEntryType::Periodic => Ok(PeriodicClockId(id)), 238 _ => Err(glib::bool_error!("Not a periodic clock id")), 239 } 240 } 241 } 242 243 impl PeriodicClockId { 244 #[doc(alias = "get_interval")] interval(&self) -> ClockTime245 pub fn interval(&self) -> ClockTime { 246 unsafe { 247 let ptr: *mut ffi::GstClockEntry = self.to_glib_none().0 as *mut _; 248 try_from_glib((*ptr).interval).expect("undefined interval") 249 } 250 } 251 compare_by_time(&self, other: &Self) -> cmp::Ordering252 pub fn compare_by_time(&self, other: &Self) -> cmp::Ordering { 253 self.0.compare_by_time(&other.0) 254 } 255 256 #[doc(alias = "gst_clock_id_wait_async")] wait_async<F>(&self, func: F) -> Result<ClockSuccess, ClockError> where F: Fn(&Clock, Option<ClockTime>, &ClockId) + Send + 'static,257 pub fn wait_async<F>(&self, func: F) -> Result<ClockSuccess, ClockError> 258 where 259 F: Fn(&Clock, Option<ClockTime>, &ClockId) + Send + 'static, 260 { 261 unsafe extern "C" fn trampoline< 262 F: Fn(&Clock, Option<ClockTime>, &ClockId) + Send + 'static, 263 >( 264 clock: *mut ffi::GstClock, 265 time: ffi::GstClockTime, 266 id: gpointer, 267 func: gpointer, 268 ) -> gboolean { 269 let f: &F = &*(func as *const F); 270 f( 271 &from_glib_borrow(clock), 272 from_glib(time), 273 &from_glib_borrow(id), 274 ); 275 glib::ffi::GTRUE 276 } 277 278 unsafe extern "C" fn destroy_notify< 279 F: Fn(&Clock, Option<ClockTime>, &ClockId) + Send + 'static, 280 >( 281 ptr: gpointer, 282 ) { 283 Box::<F>::from_raw(ptr as *mut _); 284 } 285 286 let func: Box<F> = Box::new(func); 287 unsafe { 288 try_from_glib(ffi::gst_clock_id_wait_async( 289 self.to_glib_none().0, 290 Some(trampoline::<F>), 291 Box::into_raw(func) as gpointer, 292 Some(destroy_notify::<F>), 293 )) 294 } 295 } 296 297 #[allow(clippy::type_complexity)] wait_async_stream( &self, ) -> Result< Pin<Box<dyn Stream<Item = (Option<ClockTime>, ClockId)> + Unpin + Send + 'static>>, ClockError, >298 pub fn wait_async_stream( 299 &self, 300 ) -> Result< 301 Pin<Box<dyn Stream<Item = (Option<ClockTime>, ClockId)> + Unpin + Send + 'static>>, 302 ClockError, 303 > { 304 use futures_channel::mpsc; 305 306 let (sender, receiver) = mpsc::unbounded(); 307 308 self.wait_async(move |_clock, jitter, id| { 309 if sender.unbounded_send((jitter, id.clone())).is_err() { 310 // Unschedule any future calls if the receiver end is disconnected 311 id.unschedule(); 312 } 313 })?; 314 315 Ok(Box::pin(receiver)) 316 } 317 } 318 319 #[repr(transparent)] 320 #[derive(Debug)] 321 pub struct AtomicClockReturn(AtomicI32); 322 323 impl AtomicClockReturn { load(&self) -> ClockReturn324 pub fn load(&self) -> ClockReturn { 325 unsafe { from_glib(self.0.load(atomic::Ordering::SeqCst)) } 326 } 327 store(&self, val: ClockReturn)328 pub fn store(&self, val: ClockReturn) { 329 self.0.store(val.into_glib(), atomic::Ordering::SeqCst) 330 } 331 swap(&self, val: ClockReturn) -> ClockReturn332 pub fn swap(&self, val: ClockReturn) -> ClockReturn { 333 unsafe { from_glib(self.0.swap(val.into_glib(), atomic::Ordering::SeqCst)) } 334 } 335 compare_exchange( &self, current: ClockReturn, new: ClockReturn, ) -> Result<ClockReturn, ClockReturn>336 pub fn compare_exchange( 337 &self, 338 current: ClockReturn, 339 new: ClockReturn, 340 ) -> Result<ClockReturn, ClockReturn> { 341 unsafe { 342 self.0 343 .compare_exchange( 344 current.into_glib(), 345 new.into_glib(), 346 atomic::Ordering::SeqCst, 347 atomic::Ordering::SeqCst, 348 ) 349 .map(|v| from_glib(v)) 350 .map_err(|v| from_glib(v)) 351 } 352 } 353 } 354 355 unsafe impl Send for ClockId {} 356 unsafe impl Sync for ClockId {} 357 358 impl Clock { 359 #[doc(alias = "gst_clock_adjust_with_calibration")] adjust_with_calibration( internal_target: ClockTime, cinternal: ClockTime, cexternal: ClockTime, cnum: ClockTime, cdenom: ClockTime, ) -> ClockTime360 pub fn adjust_with_calibration( 361 internal_target: ClockTime, 362 cinternal: ClockTime, 363 cexternal: ClockTime, 364 cnum: ClockTime, 365 cdenom: ClockTime, 366 ) -> ClockTime { 367 skip_assert_initialized!(); 368 unsafe { 369 try_from_glib(ffi::gst_clock_adjust_with_calibration( 370 ptr::null_mut(), 371 internal_target.into_glib(), 372 cinternal.into_glib(), 373 cexternal.into_glib(), 374 cnum.into_glib(), 375 cdenom.into_glib(), 376 )) 377 .expect("undefined ClockTime") 378 } 379 } 380 381 #[doc(alias = "gst_clock_unadjust_with_calibration")] unadjust_with_calibration( external_target: ClockTime, cinternal: ClockTime, cexternal: ClockTime, cnum: ClockTime, cdenom: ClockTime, ) -> ClockTime382 pub fn unadjust_with_calibration( 383 external_target: ClockTime, 384 cinternal: ClockTime, 385 cexternal: ClockTime, 386 cnum: ClockTime, 387 cdenom: ClockTime, 388 ) -> ClockTime { 389 skip_assert_initialized!(); 390 unsafe { 391 try_from_glib(ffi::gst_clock_unadjust_with_calibration( 392 ptr::null_mut(), 393 external_target.into_glib(), 394 cinternal.into_glib(), 395 cexternal.into_glib(), 396 cnum.into_glib(), 397 cdenom.into_glib(), 398 )) 399 .expect("undefined ClockTime") 400 } 401 } 402 } 403 404 pub trait ClockExtManual: 'static { 405 #[doc(alias = "gst_clock_new_periodic_id")] new_periodic_id(&self, start_time: ClockTime, interval: ClockTime) -> PeriodicClockId406 fn new_periodic_id(&self, start_time: ClockTime, interval: ClockTime) -> PeriodicClockId; 407 408 #[doc(alias = "gst_clock_periodic_id_reinit")] periodic_id_reinit( &self, id: &PeriodicClockId, start_time: ClockTime, interval: ClockTime, ) -> Result<(), glib::BoolError>409 fn periodic_id_reinit( 410 &self, 411 id: &PeriodicClockId, 412 start_time: ClockTime, 413 interval: ClockTime, 414 ) -> Result<(), glib::BoolError>; 415 416 #[doc(alias = "gst_clock_new_single_shot_id")] new_single_shot_id(&self, time: ClockTime) -> SingleShotClockId417 fn new_single_shot_id(&self, time: ClockTime) -> SingleShotClockId; 418 419 #[doc(alias = "gst_clock_single_shot_id_reinit")] single_shot_id_reinit( &self, id: &SingleShotClockId, time: ClockTime, ) -> Result<(), glib::BoolError>420 fn single_shot_id_reinit( 421 &self, 422 id: &SingleShotClockId, 423 time: ClockTime, 424 ) -> Result<(), glib::BoolError>; 425 set_clock_flags(&self, flags: ClockFlags)426 fn set_clock_flags(&self, flags: ClockFlags); 427 unset_clock_flags(&self, flags: ClockFlags)428 fn unset_clock_flags(&self, flags: ClockFlags); 429 430 #[doc(alias = "get_clock_flags")] clock_flags(&self) -> ClockFlags431 fn clock_flags(&self) -> ClockFlags; 432 } 433 434 impl<O: IsA<Clock>> ClockExtManual for O { new_periodic_id(&self, start_time: ClockTime, interval: ClockTime) -> PeriodicClockId435 fn new_periodic_id(&self, start_time: ClockTime, interval: ClockTime) -> PeriodicClockId { 436 assert_ne!(interval, ClockTime::ZERO); 437 438 unsafe { 439 PeriodicClockId(from_glib_full(ffi::gst_clock_new_periodic_id( 440 self.as_ref().to_glib_none().0, 441 start_time.into_glib(), 442 interval.into_glib(), 443 ))) 444 } 445 } 446 periodic_id_reinit( &self, id: &PeriodicClockId, start_time: ClockTime, interval: ClockTime, ) -> Result<(), glib::BoolError>447 fn periodic_id_reinit( 448 &self, 449 id: &PeriodicClockId, 450 start_time: ClockTime, 451 interval: ClockTime, 452 ) -> Result<(), glib::BoolError> { 453 skip_assert_initialized!(); 454 unsafe { 455 let res: bool = from_glib(ffi::gst_clock_periodic_id_reinit( 456 self.as_ref().to_glib_none().0, 457 id.to_glib_none().0, 458 start_time.into_glib(), 459 interval.into_glib(), 460 )); 461 if res { 462 Ok(()) 463 } else { 464 Err(glib::bool_error!("Failed to reinit periodic clock id")) 465 } 466 } 467 } 468 new_single_shot_id(&self, time: ClockTime) -> SingleShotClockId469 fn new_single_shot_id(&self, time: ClockTime) -> SingleShotClockId { 470 unsafe { 471 SingleShotClockId(from_glib_full(ffi::gst_clock_new_single_shot_id( 472 self.as_ref().to_glib_none().0, 473 time.into_glib(), 474 ))) 475 } 476 } 477 single_shot_id_reinit( &self, id: &SingleShotClockId, time: ClockTime, ) -> Result<(), glib::BoolError>478 fn single_shot_id_reinit( 479 &self, 480 id: &SingleShotClockId, 481 time: ClockTime, 482 ) -> Result<(), glib::BoolError> { 483 unsafe { 484 let res: bool = from_glib(ffi::gst_clock_single_shot_id_reinit( 485 self.as_ref().to_glib_none().0, 486 id.to_glib_none().0, 487 time.into_glib(), 488 )); 489 if res { 490 Ok(()) 491 } else { 492 Err(glib::bool_error!("Failed to reinit single shot clock id")) 493 } 494 } 495 } 496 set_clock_flags(&self, flags: ClockFlags)497 fn set_clock_flags(&self, flags: ClockFlags) { 498 unsafe { 499 let ptr: *mut ffi::GstObject = self.as_ptr() as *mut _; 500 let _guard = crate::utils::MutexGuard::lock(&(*ptr).lock); 501 (*ptr).flags |= flags.into_glib(); 502 } 503 } 504 unset_clock_flags(&self, flags: ClockFlags)505 fn unset_clock_flags(&self, flags: ClockFlags) { 506 unsafe { 507 let ptr: *mut ffi::GstObject = self.as_ptr() as *mut _; 508 let _guard = crate::utils::MutexGuard::lock(&(*ptr).lock); 509 (*ptr).flags &= !flags.into_glib(); 510 } 511 } 512 clock_flags(&self) -> ClockFlags513 fn clock_flags(&self) -> ClockFlags { 514 unsafe { 515 let ptr: *mut ffi::GstObject = self.as_ptr() as *mut _; 516 let _guard = crate::utils::MutexGuard::lock(&(*ptr).lock); 517 from_glib((*ptr).flags) 518 } 519 } 520 } 521 522 #[cfg(test)] 523 mod tests { 524 use super::super::prelude::*; 525 use super::super::*; 526 use super::*; 527 use std::sync::mpsc::channel; 528 529 #[test] test_wait()530 fn test_wait() { 531 crate::init().unwrap(); 532 533 let clock = SystemClock::obtain(); 534 let now = clock.time().unwrap(); 535 let id = clock.new_single_shot_id(now + 20 * ClockTime::MSECOND); 536 let (res, _) = id.wait(); 537 538 assert!(res == Ok(ClockSuccess::Ok) || res == Err(ClockError::Early)); 539 } 540 541 #[test] test_wait_async()542 fn test_wait_async() { 543 crate::init().unwrap(); 544 545 let (sender, receiver) = channel(); 546 547 let clock = SystemClock::obtain(); 548 let now = clock.time().unwrap(); 549 let id = clock.new_single_shot_id(now + 20 * ClockTime::MSECOND); 550 let res = id.wait_async(move |_, _, _| { 551 sender.send(()).unwrap(); 552 }); 553 554 assert!(res == Ok(ClockSuccess::Ok)); 555 556 assert_eq!(receiver.recv(), Ok(())); 557 } 558 559 #[test] test_wait_periodic()560 fn test_wait_periodic() { 561 crate::init().unwrap(); 562 563 let clock = SystemClock::obtain(); 564 let now = clock.time().unwrap(); 565 let id = clock.new_periodic_id(now + 20 * ClockTime::MSECOND, 20 * ClockTime::MSECOND); 566 567 let (res, _) = id.wait(); 568 assert!(res == Ok(ClockSuccess::Ok) || res == Err(ClockError::Early)); 569 570 let (res, _) = id.wait(); 571 assert!(res == Ok(ClockSuccess::Ok) || res == Err(ClockError::Early)); 572 } 573 574 #[test] test_wait_async_periodic()575 fn test_wait_async_periodic() { 576 crate::init().unwrap(); 577 578 let (sender, receiver) = channel(); 579 580 let clock = SystemClock::obtain(); 581 let now = clock.time().unwrap(); 582 let id = clock.new_periodic_id(now + 20 * ClockTime::MSECOND, 20 * ClockTime::MSECOND); 583 let res = id.wait_async(move |_, _, _| { 584 let _ = sender.send(()); 585 }); 586 587 assert!(res == Ok(ClockSuccess::Ok)); 588 589 assert_eq!(receiver.recv(), Ok(())); 590 assert_eq!(receiver.recv(), Ok(())); 591 } 592 } 593