1 // Copyright (C) 2018 Sebastian Dröge <sebastian@centricular.com> 2 // 3 // This library is free software; you can redistribute it and/or 4 // modify it under the terms of the GNU Library General Public 5 // License as published by the Free Software Foundation; either 6 // version 2 of the License, or (at your option) any later version. 7 // 8 // This library is distributed in the hope that it will be useful, 9 // but WITHOUT ANY WARRANTY; without even the implied warranty of 10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 11 // Library General Public License for more details. 12 // 13 // You should have received a copy of the GNU Library General Public 14 // License along with this library; if not, write to the 15 // Free Software Foundation, Inc., 51 Franklin Street, Suite 500, 16 // Boston, MA 02110-1335, USA. 17 18 use futures::channel::oneshot; 19 use futures::future::BoxFuture; 20 use futures::prelude::*; 21 22 use gst::glib; 23 use gst::prelude::*; 24 use gst::subclass::prelude::*; 25 use gst::{gst_debug, gst_error, gst_log, gst_trace}; 26 27 use once_cell::sync::Lazy; 28 29 use std::collections::VecDeque; 30 use std::sync::Mutex as StdMutex; 31 use std::time::Duration; 32 use std::{u32, u64}; 33 34 use crate::runtime::prelude::*; 35 use crate::runtime::{Context, PadSink, PadSinkRef, PadSrc, PadSrcRef, PadSrcWeak, Task}; 36 37 use crate::dataqueue::{DataQueue, DataQueueItem}; 38 39 const DEFAULT_MAX_SIZE_BUFFERS: u32 = 200; 40 const DEFAULT_MAX_SIZE_BYTES: u32 = 1024 * 1024; 41 const DEFAULT_MAX_SIZE_TIME: gst::ClockTime = gst::ClockTime::SECOND; 42 const DEFAULT_CONTEXT: &str = ""; 43 // FIXME use Duration::ZERO when MSVC >= 1.53.2 44 const DEFAULT_CONTEXT_WAIT: Duration = Duration::from_nanos(0); 45 46 #[derive(Debug, Clone)] 47 struct Settings { 48 max_size_buffers: u32, 49 max_size_bytes: u32, 50 max_size_time: gst::ClockTime, 51 context: String, 52 context_wait: Duration, 53 } 54 55 impl Default for Settings { default() -> Self56 fn default() -> Self { 57 Settings { 58 max_size_buffers: DEFAULT_MAX_SIZE_BUFFERS, 59 max_size_bytes: DEFAULT_MAX_SIZE_BYTES, 60 max_size_time: DEFAULT_MAX_SIZE_TIME, 61 context: DEFAULT_CONTEXT.into(), 62 context_wait: DEFAULT_CONTEXT_WAIT, 63 } 64 } 65 } 66 67 #[derive(Debug)] 68 struct PendingQueue { 69 more_queue_space_sender: Option<oneshot::Sender<()>>, 70 scheduled: bool, 71 items: VecDeque<DataQueueItem>, 72 } 73 74 impl PendingQueue { notify_more_queue_space(&mut self)75 fn notify_more_queue_space(&mut self) { 76 self.more_queue_space_sender.take(); 77 } 78 } 79 80 #[derive(Clone)] 81 struct QueuePadSinkHandler; 82 83 impl PadSinkHandler for QueuePadSinkHandler { 84 type ElementImpl = Queue; 85 sink_chain( &self, pad: &PadSinkRef, _queue: &Queue, element: &gst::Element, buffer: gst::Buffer, ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>>86 fn sink_chain( 87 &self, 88 pad: &PadSinkRef, 89 _queue: &Queue, 90 element: &gst::Element, 91 buffer: gst::Buffer, 92 ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> { 93 let pad_weak = pad.downgrade(); 94 let element = element.clone().downcast::<super::Queue>().unwrap(); 95 async move { 96 let pad = pad_weak.upgrade().expect("PadSink no longer exists"); 97 gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", buffer); 98 let queue = Queue::from_instance(&element); 99 queue 100 .enqueue_item(&element, DataQueueItem::Buffer(buffer)) 101 .await 102 } 103 .boxed() 104 } 105 sink_chain_list( &self, pad: &PadSinkRef, _queue: &Queue, element: &gst::Element, list: gst::BufferList, ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>>106 fn sink_chain_list( 107 &self, 108 pad: &PadSinkRef, 109 _queue: &Queue, 110 element: &gst::Element, 111 list: gst::BufferList, 112 ) -> BoxFuture<'static, Result<gst::FlowSuccess, gst::FlowError>> { 113 let pad_weak = pad.downgrade(); 114 let element = element.clone().downcast::<super::Queue>().unwrap(); 115 async move { 116 let pad = pad_weak.upgrade().expect("PadSink no longer exists"); 117 gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", list); 118 let queue = Queue::from_instance(&element); 119 queue 120 .enqueue_item(&element, DataQueueItem::BufferList(list)) 121 .await 122 } 123 .boxed() 124 } 125 sink_event( &self, pad: &PadSinkRef, queue: &Queue, element: &gst::Element, event: gst::Event, ) -> bool126 fn sink_event( 127 &self, 128 pad: &PadSinkRef, 129 queue: &Queue, 130 element: &gst::Element, 131 event: gst::Event, 132 ) -> bool { 133 use gst::EventView; 134 135 gst_debug!(CAT, obj: pad.gst_pad(), "Handling non-serialized {:?}", event); 136 137 if let EventView::FlushStart(..) = event.view() { 138 if let Err(err) = queue.task.flush_start() { 139 gst_error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err); 140 gst::element_error!( 141 element, 142 gst::StreamError::Failed, 143 ("Internal data stream error"), 144 ["FlushStart failed {:?}", err] 145 ); 146 return false; 147 } 148 } 149 150 gst_log!(CAT, obj: pad.gst_pad(), "Forwarding non-serialized {:?}", event); 151 queue.src_pad.gst_pad().push_event(event) 152 } 153 sink_event_serialized( &self, pad: &PadSinkRef, _queue: &Queue, element: &gst::Element, event: gst::Event, ) -> BoxFuture<'static, bool>154 fn sink_event_serialized( 155 &self, 156 pad: &PadSinkRef, 157 _queue: &Queue, 158 element: &gst::Element, 159 event: gst::Event, 160 ) -> BoxFuture<'static, bool> { 161 use gst::EventView; 162 163 gst_log!(CAT, obj: pad.gst_pad(), "Handling serialized {:?}", event); 164 165 let pad_weak = pad.downgrade(); 166 let element = element.clone().downcast::<super::Queue>().unwrap(); 167 async move { 168 let pad = pad_weak.upgrade().expect("PadSink no longer exists"); 169 let queue = Queue::from_instance(&element); 170 171 if let EventView::FlushStop(..) = event.view() { 172 if let Err(err) = queue.task.flush_stop() { 173 gst_error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err); 174 gst::element_error!( 175 element, 176 gst::StreamError::Failed, 177 ("Internal data stream error"), 178 ["FlushStop failed {:?}", err] 179 ); 180 return false; 181 } 182 } 183 184 gst_log!(CAT, obj: pad.gst_pad(), "Queuing serialized {:?}", event); 185 queue 186 .enqueue_item(&element, DataQueueItem::Event(event)) 187 .await 188 .is_ok() 189 } 190 .boxed() 191 } 192 sink_query( &self, pad: &PadSinkRef, queue: &Queue, _element: &gst::Element, query: &mut gst::QueryRef, ) -> bool193 fn sink_query( 194 &self, 195 pad: &PadSinkRef, 196 queue: &Queue, 197 _element: &gst::Element, 198 query: &mut gst::QueryRef, 199 ) -> bool { 200 gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query); 201 202 if query.is_serialized() { 203 // FIXME: How can we do this? 204 gst_log!(CAT, obj: pad.gst_pad(), "Dropping serialized {:?}", query); 205 false 206 } else { 207 gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query); 208 queue.src_pad.gst_pad().peer_query(query) 209 } 210 } 211 } 212 213 #[derive(Clone, Debug)] 214 struct QueuePadSrcHandler; 215 216 impl QueuePadSrcHandler { push_item( pad: &PadSrcRef<'_>, queue: &Queue, item: DataQueueItem, ) -> Result<(), gst::FlowError>217 async fn push_item( 218 pad: &PadSrcRef<'_>, 219 queue: &Queue, 220 item: DataQueueItem, 221 ) -> Result<(), gst::FlowError> { 222 if let Some(pending_queue) = queue.pending_queue.lock().unwrap().as_mut() { 223 pending_queue.notify_more_queue_space(); 224 } 225 226 match item { 227 DataQueueItem::Buffer(buffer) => { 228 gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", buffer); 229 pad.push(buffer).await.map(drop) 230 } 231 DataQueueItem::BufferList(list) => { 232 gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", list); 233 pad.push_list(list).await.map(drop) 234 } 235 DataQueueItem::Event(event) => { 236 gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event); 237 pad.push_event(event).await; 238 Ok(()) 239 } 240 } 241 } 242 } 243 244 impl PadSrcHandler for QueuePadSrcHandler { 245 type ElementImpl = Queue; 246 src_event( &self, pad: &PadSrcRef, queue: &Queue, element: &gst::Element, event: gst::Event, ) -> bool247 fn src_event( 248 &self, 249 pad: &PadSrcRef, 250 queue: &Queue, 251 element: &gst::Element, 252 event: gst::Event, 253 ) -> bool { 254 use gst::EventView; 255 256 gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", event); 257 258 match event.view() { 259 EventView::FlushStart(..) => { 260 if let Err(err) = queue.task.flush_start() { 261 gst_error!(CAT, obj: pad.gst_pad(), "FlushStart failed {:?}", err); 262 } 263 } 264 EventView::FlushStop(..) => { 265 if let Err(err) = queue.task.flush_stop() { 266 gst_error!(CAT, obj: pad.gst_pad(), "FlushStop failed {:?}", err); 267 gst::element_error!( 268 element, 269 gst::StreamError::Failed, 270 ("Internal data stream error"), 271 ["FlushStop failed {:?}", err] 272 ); 273 return false; 274 } 275 } 276 _ => (), 277 } 278 279 gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", event); 280 queue.sink_pad.gst_pad().push_event(event) 281 } 282 src_query( &self, pad: &PadSrcRef, queue: &Queue, _element: &gst::Element, query: &mut gst::QueryRef, ) -> bool283 fn src_query( 284 &self, 285 pad: &PadSrcRef, 286 queue: &Queue, 287 _element: &gst::Element, 288 query: &mut gst::QueryRef, 289 ) -> bool { 290 use gst::QueryView; 291 292 gst_log!(CAT, obj: pad.gst_pad(), "Handling {:?}", query); 293 294 if let QueryView::Scheduling(ref mut q) = query.view_mut() { 295 let mut new_query = gst::query::Scheduling::new(); 296 let res = queue.sink_pad.gst_pad().peer_query(&mut new_query); 297 if !res { 298 return res; 299 } 300 301 gst_log!(CAT, obj: pad.gst_pad(), "Upstream returned {:?}", new_query); 302 303 let (flags, min, max, align) = new_query.result(); 304 q.set(flags, min, max, align); 305 q.add_scheduling_modes( 306 &new_query 307 .scheduling_modes() 308 .iter() 309 .cloned() 310 .filter(|m| m != &gst::PadMode::Pull) 311 .collect::<Vec<_>>(), 312 ); 313 gst_log!(CAT, obj: pad.gst_pad(), "Returning {:?}", q.query_mut()); 314 return true; 315 } 316 317 gst_log!(CAT, obj: pad.gst_pad(), "Forwarding {:?}", query); 318 queue.sink_pad.gst_pad().peer_query(query) 319 } 320 } 321 322 #[derive(Debug)] 323 struct QueueTask { 324 element: super::Queue, 325 src_pad: PadSrcWeak, 326 dataqueue: DataQueue, 327 } 328 329 impl QueueTask { new(element: &super::Queue, src_pad: &PadSrc, dataqueue: DataQueue) -> Self330 fn new(element: &super::Queue, src_pad: &PadSrc, dataqueue: DataQueue) -> Self { 331 QueueTask { 332 element: element.clone(), 333 src_pad: src_pad.downgrade(), 334 dataqueue, 335 } 336 } 337 } 338 339 impl TaskImpl for QueueTask { start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>>340 fn start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { 341 async move { 342 gst_log!(CAT, obj: &self.element, "Starting task"); 343 344 let queue = Queue::from_instance(&self.element); 345 let mut last_res = queue.last_res.lock().unwrap(); 346 347 self.dataqueue.start(); 348 349 *last_res = Ok(gst::FlowSuccess::Ok); 350 351 gst_log!(CAT, obj: &self.element, "Task started"); 352 Ok(()) 353 } 354 .boxed() 355 } 356 iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>>357 fn iterate(&mut self) -> BoxFuture<'_, Result<(), gst::FlowError>> { 358 async move { 359 let item = self.dataqueue.next().await; 360 361 let item = match item { 362 Some(item) => item, 363 None => { 364 gst_log!(CAT, obj: &self.element, "DataQueue Stopped"); 365 return Err(gst::FlowError::Flushing); 366 } 367 }; 368 369 let pad = self.src_pad.upgrade().expect("PadSrc no longer exists"); 370 let queue = Queue::from_instance(&self.element); 371 let res = QueuePadSrcHandler::push_item(&pad, queue, item).await; 372 match res { 373 Ok(()) => { 374 gst_log!(CAT, obj: &self.element, "Successfully pushed item"); 375 *queue.last_res.lock().unwrap() = Ok(gst::FlowSuccess::Ok); 376 } 377 Err(gst::FlowError::Flushing) => { 378 gst_debug!(CAT, obj: &self.element, "Flushing"); 379 *queue.last_res.lock().unwrap() = Err(gst::FlowError::Flushing); 380 } 381 Err(gst::FlowError::Eos) => { 382 gst_debug!(CAT, obj: &self.element, "EOS"); 383 *queue.last_res.lock().unwrap() = Err(gst::FlowError::Eos); 384 pad.push_event(gst::event::Eos::new()).await; 385 } 386 Err(err) => { 387 gst_error!(CAT, obj: &self.element, "Got error {}", err); 388 gst::element_error!( 389 &self.element, 390 gst::StreamError::Failed, 391 ("Internal data stream error"), 392 ["streaming stopped, reason {}", err] 393 ); 394 *queue.last_res.lock().unwrap() = Err(err); 395 } 396 } 397 398 res 399 } 400 .boxed() 401 } 402 stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>>403 fn stop(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { 404 async move { 405 gst_log!(CAT, obj: &self.element, "Stopping task"); 406 407 let queue = Queue::from_instance(&self.element); 408 let mut last_res = queue.last_res.lock().unwrap(); 409 410 self.dataqueue.stop(); 411 self.dataqueue.clear(); 412 413 if let Some(mut pending_queue) = queue.pending_queue.lock().unwrap().take() { 414 pending_queue.notify_more_queue_space(); 415 } 416 417 *last_res = Err(gst::FlowError::Flushing); 418 419 gst_log!(CAT, obj: &self.element, "Task stopped"); 420 Ok(()) 421 } 422 .boxed() 423 } 424 flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>>425 fn flush_start(&mut self) -> BoxFuture<'_, Result<(), gst::ErrorMessage>> { 426 async move { 427 gst_log!(CAT, obj: &self.element, "Starting task flush"); 428 429 let queue = Queue::from_instance(&self.element); 430 let mut last_res = queue.last_res.lock().unwrap(); 431 432 self.dataqueue.clear(); 433 434 if let Some(mut pending_queue) = queue.pending_queue.lock().unwrap().take() { 435 pending_queue.notify_more_queue_space(); 436 } 437 438 *last_res = Err(gst::FlowError::Flushing); 439 440 gst_log!(CAT, obj: &self.element, "Task flush started"); 441 Ok(()) 442 } 443 .boxed() 444 } 445 } 446 447 #[derive(Debug)] 448 pub struct Queue { 449 sink_pad: PadSink, 450 src_pad: PadSrc, 451 task: Task, 452 dataqueue: StdMutex<Option<DataQueue>>, 453 pending_queue: StdMutex<Option<PendingQueue>>, 454 last_res: StdMutex<Result<gst::FlowSuccess, gst::FlowError>>, 455 settings: StdMutex<Settings>, 456 } 457 458 static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { 459 gst::DebugCategory::new( 460 "ts-queue", 461 gst::DebugColorFlags::empty(), 462 Some("Thread-sharing queue"), 463 ) 464 }); 465 466 impl Queue { 467 /* Try transfering all the items from the pending queue to the DataQueue, then 468 * the current item. Errors out if the DataQueue was full, or the pending queue 469 * is already scheduled, in which case the current item should be added to the 470 * pending queue */ queue_until_full( &self, dataqueue: &DataQueue, pending_queue: &mut Option<PendingQueue>, item: DataQueueItem, ) -> Result<(), DataQueueItem>471 fn queue_until_full( 472 &self, 473 dataqueue: &DataQueue, 474 pending_queue: &mut Option<PendingQueue>, 475 item: DataQueueItem, 476 ) -> Result<(), DataQueueItem> { 477 match pending_queue { 478 None => dataqueue.push(item), 479 Some(PendingQueue { 480 scheduled: false, 481 ref mut items, 482 .. 483 }) => { 484 let mut failed_item = None; 485 while let Some(item) = items.pop_front() { 486 if let Err(item) = dataqueue.push(item) { 487 failed_item = Some(item); 488 } 489 } 490 491 if let Some(failed_item) = failed_item { 492 items.push_front(failed_item); 493 494 Err(item) 495 } else { 496 dataqueue.push(item) 497 } 498 } 499 _ => Err(item), 500 } 501 } 502 503 /* Schedules emptying of the pending queue. If there is an upstream 504 * TaskContext, the new task is spawned, it is otherwise 505 * returned, for the caller to block on */ schedule_pending_queue(&self, element: &super::Queue)506 async fn schedule_pending_queue(&self, element: &super::Queue) { 507 loop { 508 let more_queue_space_receiver = { 509 let dataqueue = self.dataqueue.lock().unwrap(); 510 if dataqueue.is_none() { 511 return; 512 } 513 let mut pending_queue_grd = self.pending_queue.lock().unwrap(); 514 515 gst_log!(CAT, obj: element, "Trying to empty pending queue"); 516 517 if let Some(pending_queue) = pending_queue_grd.as_mut() { 518 let mut failed_item = None; 519 while let Some(item) = pending_queue.items.pop_front() { 520 if let Err(item) = dataqueue.as_ref().unwrap().push(item) { 521 failed_item = Some(item); 522 } 523 } 524 525 if let Some(failed_item) = failed_item { 526 pending_queue.items.push_front(failed_item); 527 let (sender, receiver) = oneshot::channel(); 528 pending_queue.more_queue_space_sender = Some(sender); 529 530 receiver 531 } else { 532 gst_log!(CAT, obj: element, "Pending queue is empty now"); 533 *pending_queue_grd = None; 534 return; 535 } 536 } else { 537 gst_log!(CAT, obj: element, "Flushing, dropping pending queue"); 538 return; 539 } 540 }; 541 542 gst_log!(CAT, obj: element, "Waiting for more queue space"); 543 let _ = more_queue_space_receiver.await; 544 } 545 } 546 enqueue_item( &self, element: &super::Queue, item: DataQueueItem, ) -> Result<gst::FlowSuccess, gst::FlowError>547 async fn enqueue_item( 548 &self, 549 element: &super::Queue, 550 item: DataQueueItem, 551 ) -> Result<gst::FlowSuccess, gst::FlowError> { 552 let wait_fut = { 553 let dataqueue = self.dataqueue.lock().unwrap(); 554 let dataqueue = dataqueue.as_ref().ok_or_else(|| { 555 gst_error!(CAT, obj: element, "No DataQueue"); 556 gst::FlowError::Error 557 })?; 558 559 let mut pending_queue = self.pending_queue.lock().unwrap(); 560 561 if let Err(item) = self.queue_until_full(dataqueue, &mut pending_queue, item) { 562 if pending_queue 563 .as_ref() 564 .map(|pq| !pq.scheduled) 565 .unwrap_or(true) 566 { 567 if pending_queue.is_none() { 568 *pending_queue = Some(PendingQueue { 569 more_queue_space_sender: None, 570 scheduled: false, 571 items: VecDeque::new(), 572 }); 573 } 574 575 let schedule_now = !matches!( 576 item, 577 DataQueueItem::Event(ref ev) if ev.type_() != gst::EventType::Eos, 578 ); 579 580 pending_queue.as_mut().unwrap().items.push_back(item); 581 582 gst_log!( 583 CAT, 584 obj: element, 585 "Queue is full - Pushing first item on pending queue" 586 ); 587 588 if schedule_now { 589 gst_log!(CAT, obj: element, "Scheduling pending queue now"); 590 pending_queue.as_mut().unwrap().scheduled = true; 591 592 let wait_fut = self.schedule_pending_queue(element); 593 Some(wait_fut) 594 } else { 595 gst_log!(CAT, obj: element, "Scheduling pending queue later"); 596 None 597 } 598 } else { 599 pending_queue.as_mut().unwrap().items.push_back(item); 600 None 601 } 602 } else { 603 None 604 } 605 }; 606 607 if let Some(wait_fut) = wait_fut { 608 gst_log!(CAT, obj: element, "Blocking until queue has space again"); 609 wait_fut.await; 610 } 611 612 *self.last_res.lock().unwrap() 613 } 614 prepare(&self, element: &super::Queue) -> Result<(), gst::ErrorMessage>615 fn prepare(&self, element: &super::Queue) -> Result<(), gst::ErrorMessage> { 616 gst_debug!(CAT, obj: element, "Preparing"); 617 618 let settings = self.settings.lock().unwrap().clone(); 619 620 let dataqueue = DataQueue::new( 621 &element.clone().upcast(), 622 self.src_pad.gst_pad(), 623 if settings.max_size_buffers == 0 { 624 None 625 } else { 626 Some(settings.max_size_buffers) 627 }, 628 if settings.max_size_bytes == 0 { 629 None 630 } else { 631 Some(settings.max_size_bytes) 632 }, 633 if settings.max_size_time.is_zero() { 634 None 635 } else { 636 Some(settings.max_size_time) 637 }, 638 ); 639 640 *self.dataqueue.lock().unwrap() = Some(dataqueue.clone()); 641 642 let context = 643 Context::acquire(&settings.context, settings.context_wait).map_err(|err| { 644 gst::error_msg!( 645 gst::ResourceError::OpenRead, 646 ["Failed to acquire Context: {}", err] 647 ) 648 })?; 649 650 self.task 651 .prepare(QueueTask::new(element, &self.src_pad, dataqueue), context) 652 .map_err(|err| { 653 gst::error_msg!( 654 gst::ResourceError::OpenRead, 655 ["Error preparing Task: {:?}", err] 656 ) 657 })?; 658 659 gst_debug!(CAT, obj: element, "Prepared"); 660 661 Ok(()) 662 } 663 unprepare(&self, element: &super::Queue)664 fn unprepare(&self, element: &super::Queue) { 665 gst_debug!(CAT, obj: element, "Unpreparing"); 666 667 self.task.unprepare().unwrap(); 668 669 *self.dataqueue.lock().unwrap() = None; 670 *self.pending_queue.lock().unwrap() = None; 671 672 *self.last_res.lock().unwrap() = Ok(gst::FlowSuccess::Ok); 673 674 gst_debug!(CAT, obj: element, "Unprepared"); 675 } 676 stop(&self, element: &super::Queue) -> Result<(), gst::ErrorMessage>677 fn stop(&self, element: &super::Queue) -> Result<(), gst::ErrorMessage> { 678 gst_debug!(CAT, obj: element, "Stopping"); 679 self.task.stop()?; 680 gst_debug!(CAT, obj: element, "Stopped"); 681 Ok(()) 682 } 683 start(&self, element: &super::Queue) -> Result<(), gst::ErrorMessage>684 fn start(&self, element: &super::Queue) -> Result<(), gst::ErrorMessage> { 685 gst_debug!(CAT, obj: element, "Starting"); 686 self.task.start()?; 687 gst_debug!(CAT, obj: element, "Started"); 688 Ok(()) 689 } 690 } 691 692 #[glib::object_subclass] 693 impl ObjectSubclass for Queue { 694 const NAME: &'static str = "RsTsQueue"; 695 type Type = super::Queue; 696 type ParentType = gst::Element; 697 with_class(klass: &Self::Class) -> Self698 fn with_class(klass: &Self::Class) -> Self { 699 Self { 700 sink_pad: PadSink::new( 701 gst::Pad::from_template(&klass.pad_template("sink").unwrap(), Some("sink")), 702 QueuePadSinkHandler, 703 ), 704 src_pad: PadSrc::new( 705 gst::Pad::from_template(&klass.pad_template("src").unwrap(), Some("src")), 706 QueuePadSrcHandler, 707 ), 708 task: Task::default(), 709 dataqueue: StdMutex::new(None), 710 pending_queue: StdMutex::new(None), 711 last_res: StdMutex::new(Ok(gst::FlowSuccess::Ok)), 712 settings: StdMutex::new(Settings::default()), 713 } 714 } 715 } 716 717 impl ObjectImpl for Queue { properties() -> &'static [glib::ParamSpec]718 fn properties() -> &'static [glib::ParamSpec] { 719 static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| { 720 vec![ 721 glib::ParamSpec::new_string( 722 "context", 723 "Context", 724 "Context name to share threads with", 725 Some(DEFAULT_CONTEXT), 726 glib::ParamFlags::READWRITE, 727 ), 728 glib::ParamSpec::new_uint( 729 "context-wait", 730 "Context Wait", 731 "Throttle poll loop to run at most once every this many ms", 732 0, 733 1000, 734 DEFAULT_CONTEXT_WAIT.as_millis() as u32, 735 glib::ParamFlags::READWRITE, 736 ), 737 glib::ParamSpec::new_uint( 738 "max-size-buffers", 739 "Max Size Buffers", 740 "Maximum number of buffers to queue (0=unlimited)", 741 0, 742 u32::MAX, 743 DEFAULT_MAX_SIZE_BUFFERS, 744 glib::ParamFlags::READWRITE, 745 ), 746 glib::ParamSpec::new_uint( 747 "max-size-bytes", 748 "Max Size Bytes", 749 "Maximum number of bytes to queue (0=unlimited)", 750 0, 751 u32::MAX, 752 DEFAULT_MAX_SIZE_BYTES, 753 glib::ParamFlags::READWRITE, 754 ), 755 glib::ParamSpec::new_uint64( 756 "max-size-time", 757 "Max Size Time", 758 "Maximum number of nanoseconds to queue (0=unlimited)", 759 0, 760 u64::MAX - 1, 761 DEFAULT_MAX_SIZE_TIME.nseconds(), 762 glib::ParamFlags::READWRITE, 763 ), 764 ] 765 }); 766 767 PROPERTIES.as_ref() 768 } 769 set_property( &self, _obj: &Self::Type, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec, )770 fn set_property( 771 &self, 772 _obj: &Self::Type, 773 _id: usize, 774 value: &glib::Value, 775 pspec: &glib::ParamSpec, 776 ) { 777 let mut settings = self.settings.lock().unwrap(); 778 match pspec.name() { 779 "max-size-buffers" => { 780 settings.max_size_buffers = value.get().expect("type checked upstream"); 781 } 782 "max-size-bytes" => { 783 settings.max_size_bytes = value.get().expect("type checked upstream"); 784 } 785 "max-size-time" => { 786 settings.max_size_time = 787 gst::ClockTime::from_nseconds(value.get().expect("type checked upstream")); 788 } 789 "context" => { 790 settings.context = value 791 .get::<Option<String>>() 792 .expect("type checked upstream") 793 .unwrap_or_else(|| "".into()); 794 } 795 "context-wait" => { 796 settings.context_wait = Duration::from_millis( 797 value.get::<u32>().expect("type checked upstream").into(), 798 ); 799 } 800 _ => unimplemented!(), 801 } 802 } 803 property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value804 fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { 805 let settings = self.settings.lock().unwrap(); 806 match pspec.name() { 807 "max-size-buffers" => settings.max_size_buffers.to_value(), 808 "max-size-bytes" => settings.max_size_bytes.to_value(), 809 "max-size-time" => settings.max_size_time.nseconds().to_value(), 810 "context" => settings.context.to_value(), 811 "context-wait" => (settings.context_wait.as_millis() as u32).to_value(), 812 _ => unimplemented!(), 813 } 814 } 815 constructed(&self, obj: &Self::Type)816 fn constructed(&self, obj: &Self::Type) { 817 self.parent_constructed(obj); 818 819 obj.add_pad(self.sink_pad.gst_pad()).unwrap(); 820 obj.add_pad(self.src_pad.gst_pad()).unwrap(); 821 } 822 } 823 824 impl ElementImpl for Queue { metadata() -> Option<&'static gst::subclass::ElementMetadata>825 fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { 826 static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| { 827 gst::subclass::ElementMetadata::new( 828 "Thread-sharing queue", 829 "Generic", 830 "Simple data queue", 831 "Sebastian Dröge <sebastian@centricular.com>", 832 ) 833 }); 834 835 Some(&*ELEMENT_METADATA) 836 } 837 pad_templates() -> &'static [gst::PadTemplate]838 fn pad_templates() -> &'static [gst::PadTemplate] { 839 static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| { 840 let caps = gst::Caps::new_any(); 841 842 let sink_pad_template = gst::PadTemplate::new( 843 "sink", 844 gst::PadDirection::Sink, 845 gst::PadPresence::Always, 846 &caps, 847 ) 848 .unwrap(); 849 850 let src_pad_template = gst::PadTemplate::new( 851 "src", 852 gst::PadDirection::Src, 853 gst::PadPresence::Always, 854 &caps, 855 ) 856 .unwrap(); 857 858 vec![sink_pad_template, src_pad_template] 859 }); 860 861 PAD_TEMPLATES.as_ref() 862 } 863 change_state( &self, element: &Self::Type, transition: gst::StateChange, ) -> Result<gst::StateChangeSuccess, gst::StateChangeError>864 fn change_state( 865 &self, 866 element: &Self::Type, 867 transition: gst::StateChange, 868 ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> { 869 gst_trace!(CAT, obj: element, "Changing state {:?}", transition); 870 871 match transition { 872 gst::StateChange::NullToReady => { 873 self.prepare(element).map_err(|err| { 874 element.post_error_message(err); 875 gst::StateChangeError 876 })?; 877 } 878 gst::StateChange::PausedToReady => { 879 self.stop(element).map_err(|_| gst::StateChangeError)?; 880 } 881 gst::StateChange::ReadyToNull => { 882 self.unprepare(element); 883 } 884 _ => (), 885 } 886 887 let success = self.parent_change_state(element, transition)?; 888 889 if transition == gst::StateChange::ReadyToPaused { 890 self.start(element).map_err(|_| gst::StateChangeError)?; 891 } 892 893 Ok(success) 894 } 895 } 896