1 // Copyright (C) 2019-2020 François Laignel <fengalin@free.fr>
2 // Copyright (C) 2020 Sebastian Dröge <sebastian@centricular.com>
3 //
4 // This library is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU Library General Public
6 // License as published by the Free Software Foundation; either
7 // version 2 of the License, or (at your option) any later version.
8 //
9 // This library is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 // Library General Public License for more details.
13 //
14 // You should have received a copy of the GNU Library General Public
15 // License along with this library; if not, write to the
16 // Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
17 // Boston, MA 02110-1335, USA.
18
19 //! An implementation of `Pad`s to run asynchronous processings.
20 //!
21 //! [`PadSink`] & [`PadSrc`] provide an asynchronous API to ease the development of `Element`s in
22 //! the `threadshare` GStreamer plugins framework.
23 //!
24 //! The diagram below shows how the [`PadSrc`] & [`PadSink`] and the related `struct`s integrate in
25 //! `ts` `Element`s.
26 //!
27 //! Note: [`PadSrc`] & [`PadSink`] only support `gst::PadMode::Push` at the moment.
28 //!
29 //! ```text
30 //! ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━
31 //! Element A ┃ ┃ Element B
32 //! ┃ ┃
33 //! ╭─────────────────╮ ┃ ┃ ╭──────────────────╮
34 //! │ PadSrc │ ┃ ┃ │ PadSink │
35 //! │ Handler │ ┃ ┃ │ Handler │
36 //! │─────────────────│ ┃ ┃ │──────────────────│
37 //! │ - src_activate* │ ╭──┸──╮ ╭──┸──╮ │ - sink_activate* │
38 //! │ - src_event* │<────│ │<╌╌╌│ │───>│ - sink_chain* │
39 //! │ - src_query │<────│ gst │ │ gst │───>│ - sink_event* │
40 //! │─────────────────│ │ │ │ │───>│ - sink_query │
41 //! │ - task fn │ │ Pad │ │ Pad │ ╰──────────────────╯
42 //! ╰─────────────────╯ ╭─>│ │╌╌╌>│ │─╮ │
43 //! ╭───────╯ │ │ ╰──┰──╯ ╰──┰──╯ ╰───────╮ │
44 //! ╭────────────╮ ╭────────╮ push* │ ┃ ┃ ╭─────────╮
45 //! │ Pad Task ↺ │──>│ PadSrc │───────╯ ┃ ┃ │ PadSink │
46 //! ╰────────────╯ ╰────────╯ ┃ ┃ ╰─────────╯
47 //! ━━━━━━━│━━━━━━━━━━━━━━│━━━━━━━━━━━━━━━━━┛ ┗━━━━━━━━━━━━━━━│━━━━━━━━━━━━
48 //! ╰──────────────┴───────────────────╮ ╭─────────────────╯
49 //! ╭────────────╮
50 //! │ Context ↺ │
51 //! ╰────────────╯
52 //! ```
53 //!
54 //! Asynchronous operations for both [`PadSrc`] in `Element A` and [`PadSink`] in `Element B` run on
55 //! the same [`Context`], which can also be shared by other `Element`s or instances of the same
56 //! `Element`s in multiple `Pipeline`s.
57 //!
58 //! `Element A` & `Element B` can also be linked to non-threadshare `Element`s in which case, they
59 //! operate in a regular synchronous way.
60 //!
61 //! Note that only operations on the streaming thread (serialized events, buffers, serialized
62 //! queries) are handled from the `PadContext` and asynchronously, everything else operates
63 //! blocking.
64 //!
65 //! [`PadSink`]: struct.PadSink.html
66 //! [`PadSrc`]: struct.PadSrc.html
67 //! [`Context`]: ../executor/struct.Context.html
68
69 use futures::future;
70 use futures::future::BoxFuture;
71 use futures::prelude::*;
72
73 use gst::prelude::*;
74 use gst::subclass::prelude::*;
75 use gst::{gst_debug, gst_error, gst_fixme, gst_log};
76 use gst::{FlowError, FlowSuccess};
77
78 use std::marker::PhantomData;
79 use std::ops::Deref;
80 use std::sync::{Arc, Weak};
81
82 use super::executor::{block_on_or_add_sub_task, Context};
83 use super::RUNTIME_CAT;
84
85 #[inline]
event_ret_to_event_full_res( ret: bool, event_type: gst::EventType, ) -> Result<FlowSuccess, FlowError>86 fn event_ret_to_event_full_res(
87 ret: bool,
88 event_type: gst::EventType,
89 ) -> Result<FlowSuccess, FlowError> {
90 if ret {
91 Ok(FlowSuccess::Ok)
92 } else if event_type == gst::EventType::Caps {
93 Err(FlowError::NotNegotiated)
94 } else {
95 Err(FlowError::Error)
96 }
97 }
98
99 #[inline]
event_to_event_full(ret: bool, event_type: gst::EventType) -> Result<FlowSuccess, FlowError>100 fn event_to_event_full(ret: bool, event_type: gst::EventType) -> Result<FlowSuccess, FlowError> {
101 event_ret_to_event_full_res(ret, event_type)
102 }
103
104 #[inline]
event_to_event_full_serialized( ret: BoxFuture<'static, bool>, event_type: gst::EventType, ) -> BoxFuture<'static, Result<FlowSuccess, FlowError>>105 fn event_to_event_full_serialized(
106 ret: BoxFuture<'static, bool>,
107 event_type: gst::EventType,
108 ) -> BoxFuture<'static, Result<FlowSuccess, FlowError>> {
109 ret.map(move |ret| event_ret_to_event_full_res(ret, event_type))
110 .boxed()
111 }
112
113 /// A trait to define `handler`s for [`PadSrc`] callbacks.
114 ///
115 /// *See the [`pad` module] documentation for a description of the model.*
116 ///
117 /// [`PadSrc`]: struct.PadSrc.html
118 /// [`pad` module]: index.html
119 pub trait PadSrcHandler: Clone + Send + Sync + 'static {
120 type ElementImpl: ElementImpl + ObjectSubclass;
121
src_activate( &self, pad: &PadSrcRef, _imp: &Self::ElementImpl, _element: &gst::Element, ) -> Result<(), gst::LoggableError>122 fn src_activate(
123 &self,
124 pad: &PadSrcRef,
125 _imp: &Self::ElementImpl,
126 _element: &gst::Element,
127 ) -> Result<(), gst::LoggableError> {
128 let gst_pad = pad.gst_pad();
129 if gst_pad.is_active() {
130 gst_debug!(
131 RUNTIME_CAT,
132 obj: gst_pad,
133 "Already activated in {:?} mode ",
134 gst_pad.mode()
135 );
136 return Ok(());
137 }
138
139 gst_pad
140 .activate_mode(gst::PadMode::Push, true)
141 .map_err(|err| {
142 gst_error!(
143 RUNTIME_CAT,
144 obj: gst_pad,
145 "Error in PadSrc activate: {:?}",
146 err
147 );
148 gst::loggable_error!(RUNTIME_CAT, "Error in PadSrc activate: {:?}", err)
149 })
150 }
151
src_activatemode( &self, _pad: &PadSrcRef, _imp: &Self::ElementImpl, _element: &gst::Element, _mode: gst::PadMode, _active: bool, ) -> Result<(), gst::LoggableError>152 fn src_activatemode(
153 &self,
154 _pad: &PadSrcRef,
155 _imp: &Self::ElementImpl,
156 _element: &gst::Element,
157 _mode: gst::PadMode,
158 _active: bool,
159 ) -> Result<(), gst::LoggableError> {
160 Ok(())
161 }
162
src_event( &self, pad: &PadSrcRef, _imp: &Self::ElementImpl, element: &gst::Element, event: gst::Event, ) -> bool163 fn src_event(
164 &self,
165 pad: &PadSrcRef,
166 _imp: &Self::ElementImpl,
167 element: &gst::Element,
168 event: gst::Event,
169 ) -> bool {
170 gst_log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
171 pad.gst_pad().event_default(Some(element), event)
172 }
173
src_event_full( &self, pad: &PadSrcRef, imp: &Self::ElementImpl, element: &gst::Element, event: gst::Event, ) -> Result<FlowSuccess, FlowError>174 fn src_event_full(
175 &self,
176 pad: &PadSrcRef,
177 imp: &Self::ElementImpl,
178 element: &gst::Element,
179 event: gst::Event,
180 ) -> Result<FlowSuccess, FlowError> {
181 // default is to dispatch to `src_event`
182 // (as implemented in `gst_pad_send_event_unchecked`)
183 let event_type = event.type_();
184 event_to_event_full(self.src_event(pad, imp, element, event), event_type)
185 }
186
src_query( &self, pad: &PadSrcRef, _imp: &Self::ElementImpl, element: &gst::Element, query: &mut gst::QueryRef, ) -> bool187 fn src_query(
188 &self,
189 pad: &PadSrcRef,
190 _imp: &Self::ElementImpl,
191 element: &gst::Element,
192 query: &mut gst::QueryRef,
193 ) -> bool {
194 gst_log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", query);
195 if query.is_serialized() {
196 // FIXME serialized queries should be handled with the dataflow
197 // but we can't return a `Future` because we couldn't honor QueryRef's lifetime
198 false
199 } else {
200 pad.gst_pad().query_default(Some(element), query)
201 }
202 }
203 }
204
205 #[derive(Debug)]
206 pub struct PadSrcInner {
207 gst_pad: gst::Pad,
208 }
209
210 impl PadSrcInner {
new(gst_pad: gst::Pad) -> Self211 fn new(gst_pad: gst::Pad) -> Self {
212 if gst_pad.direction() != gst::PadDirection::Src {
213 panic!("Wrong pad direction for PadSrc");
214 }
215
216 PadSrcInner { gst_pad }
217 }
218
gst_pad(&self) -> &gst::Pad219 pub fn gst_pad(&self) -> &gst::Pad {
220 &self.gst_pad
221 }
222
push(&self, buffer: gst::Buffer) -> Result<FlowSuccess, FlowError>223 pub async fn push(&self, buffer: gst::Buffer) -> Result<FlowSuccess, FlowError> {
224 gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Pushing {:?}", buffer);
225
226 let success = self.gst_pad.push(buffer).map_err(|err| {
227 gst_error!(RUNTIME_CAT,
228 obj: self.gst_pad(),
229 "Failed to push Buffer to PadSrc: {:?}",
230 err,
231 );
232 err
233 })?;
234
235 gst_log!(RUNTIME_CAT, obj: &self.gst_pad, "Processing any pending sub tasks");
236 while Context::current_has_sub_tasks() {
237 Context::drain_sub_tasks().await?;
238 }
239
240 Ok(success)
241 }
242
push_list(&self, list: gst::BufferList) -> Result<FlowSuccess, FlowError>243 pub async fn push_list(&self, list: gst::BufferList) -> Result<FlowSuccess, FlowError> {
244 gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Pushing {:?}", list);
245
246 let success = self.gst_pad.push_list(list).map_err(|err| {
247 gst_error!(
248 RUNTIME_CAT,
249 obj: self.gst_pad(),
250 "Failed to push BufferList to PadSrc: {:?}",
251 err,
252 );
253 err
254 })?;
255
256 gst_log!(RUNTIME_CAT, obj: &self.gst_pad, "Processing any pending sub tasks");
257 while Context::current_has_sub_tasks() {
258 Context::drain_sub_tasks().await?;
259 }
260
261 Ok(success)
262 }
263
push_event(&self, event: gst::Event) -> bool264 pub async fn push_event(&self, event: gst::Event) -> bool {
265 gst_log!(RUNTIME_CAT, obj: &self.gst_pad, "Pushing {:?}", event);
266
267 let was_handled = self.gst_pad().push_event(event);
268
269 gst_log!(RUNTIME_CAT, obj: &self.gst_pad, "Processing any pending sub tasks");
270 while Context::current_has_sub_tasks() {
271 if Context::drain_sub_tasks().await.is_err() {
272 return false;
273 }
274 }
275
276 was_handled
277 }
278 }
279
280 /// A [`PadSrc`] which can be moved in [`handler`]s functions and `Future`s.
281 ///
282 /// Call [`upgrade`] to use the [`PadSrc`].
283 ///
284 /// *See the [`pad` module] documentation for a description of the model.*
285 ///
286 /// [`PadSrc`]: struct.PadSrc.html
287 /// [`handler`]: trait.PadSrcHandler.html
288 /// [`upgrade`]: struct.PadSrcWeak.html#method.upgrade
289 /// [`pad` module]: index.html
290 #[derive(Clone, Debug)]
291 pub struct PadSrcWeak(Weak<PadSrcInner>);
292
293 impl PadSrcWeak {
upgrade(&self) -> Option<PadSrcRef<'_>>294 pub fn upgrade(&self) -> Option<PadSrcRef<'_>> {
295 self.0.upgrade().map(PadSrcRef::new)
296 }
297 }
298
299 /// A [`PadSrc`] to be used in `Handler`s functions and `Future`s.
300 ///
301 /// Call [`downgrade`] if you need to `clone` the [`PadSrc`].
302 ///
303 /// *See the [`pad` module] documentation for a description of the model.*
304 ///
305 /// [`PadSrc`]: struct.PadSrc.html
306 /// [`PadSrcWeak`]: struct.PadSrcWeak.html
307 /// [`downgrade`]: struct.PadSrcRef.html#method.downgrade
308 /// [`pad` module]: index.html
309 #[derive(Debug)]
310 pub struct PadSrcRef<'a> {
311 strong: Arc<PadSrcInner>,
312 phantom: PhantomData<&'a Self>,
313 }
314
315 impl<'a> PadSrcRef<'a> {
new(inner_arc: Arc<PadSrcInner>) -> Self316 fn new(inner_arc: Arc<PadSrcInner>) -> Self {
317 PadSrcRef {
318 strong: inner_arc,
319 phantom: PhantomData,
320 }
321 }
322
downgrade(&self) -> PadSrcWeak323 pub fn downgrade(&self) -> PadSrcWeak {
324 PadSrcWeak(Arc::downgrade(&self.strong))
325 }
326
activate_mode_hook( &self, mode: gst::PadMode, active: bool, ) -> Result<(), gst::LoggableError>327 fn activate_mode_hook(
328 &self,
329 mode: gst::PadMode,
330 active: bool,
331 ) -> Result<(), gst::LoggableError> {
332 // Important: don't panic here as the hook is used without `catch_panic_pad_function`
333 // in the default `activatemode` handling
334 gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "ActivateMode {:?}, {}", mode, active);
335
336 if mode == gst::PadMode::Pull {
337 gst_error!(RUNTIME_CAT, obj: self.gst_pad(), "Pull mode not supported by PadSrc");
338 return Err(gst::loggable_error!(
339 RUNTIME_CAT,
340 "Pull mode not supported by PadSrc"
341 ));
342 }
343
344 Ok(())
345 }
346 }
347
348 impl<'a> Deref for PadSrcRef<'a> {
349 type Target = PadSrcInner;
350
deref(&self) -> &Self::Target351 fn deref(&self) -> &Self::Target {
352 &self.strong
353 }
354 }
355
356 /// The `PadSrc` which `Element`s must own.
357 ///
358 /// Call [`downgrade`] if you need to `clone` the `PadSrc`.
359 ///
360 /// *See the [`pad` module] documentation for a description of the model.*
361 ///
362 /// [`downgrade`]: struct.PadSrc.html#method.downgrade
363 /// [`pad` module]: index.html
364 #[derive(Debug)]
365 pub struct PadSrc(Arc<PadSrcInner>);
366
367 impl PadSrc {
new(gst_pad: gst::Pad, handler: impl PadSrcHandler) -> Self368 pub fn new(gst_pad: gst::Pad, handler: impl PadSrcHandler) -> Self {
369 let this = PadSrc(Arc::new(PadSrcInner::new(gst_pad)));
370 this.init_pad_functions(handler);
371
372 this
373 }
374
downgrade(&self) -> PadSrcWeak375 pub fn downgrade(&self) -> PadSrcWeak {
376 PadSrcWeak(Arc::downgrade(&self.0))
377 }
378
as_ref(&self) -> PadSrcRef<'_>379 pub fn as_ref(&self) -> PadSrcRef<'_> {
380 PadSrcRef::new(Arc::clone(&self.0))
381 }
382
check_reconfigure(&self) -> bool383 pub fn check_reconfigure(&self) -> bool {
384 self.0.gst_pad().check_reconfigure()
385 }
386
init_pad_functions<H: PadSrcHandler>(&self, handler: H)387 fn init_pad_functions<H: PadSrcHandler>(&self, handler: H) {
388 // FIXME: Do this better
389 unsafe {
390 let handler_clone = handler.clone();
391 let inner_arc = Arc::clone(&self.0);
392 self.0
393 .gst_pad()
394 .set_activate_function(move |gst_pad, parent| {
395 let handler = handler_clone.clone();
396 let inner_arc = inner_arc.clone();
397 H::ElementImpl::catch_panic_pad_function(
398 parent,
399 || {
400 gst_error!(RUNTIME_CAT, obj: gst_pad, "Panic in PadSrc activate");
401 Err(gst::loggable_error!(
402 RUNTIME_CAT,
403 "Panic in PadSrc activate"
404 ))
405 },
406 move |imp, element| {
407 let this_ref = PadSrcRef::new(inner_arc);
408 handler.src_activate(
409 &this_ref,
410 imp,
411 element.dynamic_cast_ref::<gst::Element>().unwrap(),
412 )
413 },
414 )
415 });
416
417 let handler_clone = handler.clone();
418 let inner_arc = Arc::clone(&self.0);
419 self.gst_pad()
420 .set_activatemode_function(move |gst_pad, parent, mode, active| {
421 let handler = handler_clone.clone();
422 let inner_arc = inner_arc.clone();
423 H::ElementImpl::catch_panic_pad_function(
424 parent,
425 || {
426 gst_error!(RUNTIME_CAT, obj: gst_pad, "Panic in PadSrc activatemode");
427 Err(gst::loggable_error!(
428 RUNTIME_CAT,
429 "Panic in PadSrc activatemode"
430 ))
431 },
432 move |imp, element| {
433 let this_ref = PadSrcRef::new(inner_arc);
434 this_ref.activate_mode_hook(mode, active)?;
435 handler.src_activatemode(
436 &this_ref,
437 imp,
438 element.dynamic_cast_ref::<gst::Element>().unwrap(),
439 mode,
440 active,
441 )
442 },
443 )
444 });
445
446 // No need to `set_event_function` since `set_event_full_function`
447 // overrides it and dispatches to `src_event` when necessary
448 let handler_clone = handler.clone();
449 let inner_arc = Arc::clone(&self.0);
450 self.gst_pad()
451 .set_event_full_function(move |_gst_pad, parent, event| {
452 let handler = handler_clone.clone();
453 let inner_arc = inner_arc.clone();
454 H::ElementImpl::catch_panic_pad_function(
455 parent,
456 || Err(FlowError::Error),
457 move |imp, element| {
458 let this_ref = PadSrcRef::new(inner_arc);
459 handler.src_event_full(
460 &this_ref,
461 imp,
462 element.dynamic_cast_ref::<gst::Element>().unwrap(),
463 event,
464 )
465 },
466 )
467 });
468
469 let inner_arc = Arc::clone(&self.0);
470 self.gst_pad()
471 .set_query_function(move |_gst_pad, parent, query| {
472 let handler = handler.clone();
473 let inner_arc = inner_arc.clone();
474 H::ElementImpl::catch_panic_pad_function(
475 parent,
476 || false,
477 move |imp, element| {
478 let this_ref = PadSrcRef::new(inner_arc);
479 if !query.is_serialized() {
480 handler.src_query(&this_ref, imp, element.dynamic_cast_ref::<gst::Element>().unwrap(), query)
481 } else {
482 gst_fixme!(RUNTIME_CAT, obj: this_ref.gst_pad(), "Serialized Query not supported");
483 false
484 }
485 },
486 )
487 });
488 }
489 }
490 }
491
492 impl Drop for PadSrc {
drop(&mut self)493 fn drop(&mut self) {
494 // FIXME: Do this better
495 unsafe {
496 self.gst_pad()
497 .set_activate_function(move |_gst_pad, _parent| {
498 Err(gst::loggable_error!(RUNTIME_CAT, "PadSrc no longer exists"))
499 });
500 self.gst_pad()
501 .set_activatemode_function(move |_gst_pad, _parent, _mode, _active| {
502 Err(gst::loggable_error!(RUNTIME_CAT, "PadSrc no longer exists"))
503 });
504 self.gst_pad()
505 .set_event_function(move |_gst_pad, _parent, _event| false);
506 self.gst_pad()
507 .set_event_full_function(move |_gst_pad, _parent, _event| Err(FlowError::Flushing));
508 self.gst_pad()
509 .set_query_function(move |_gst_pad, _parent, _query| false);
510 }
511 }
512 }
513
514 impl Deref for PadSrc {
515 type Target = PadSrcInner;
516
deref(&self) -> &Self::Target517 fn deref(&self) -> &Self::Target {
518 &self.0
519 }
520 }
521
522 /// A trait to define `handler`s for [`PadSink`] callbacks.
523 ///
524 /// *See the [`pad` module] documentation for a description of the model.*
525 ///
526 /// [`PadSink`]: struct.PadSink.html
527 /// [`pad` module]: index.html
528 pub trait PadSinkHandler: Clone + Send + Sync + 'static {
529 type ElementImpl: ElementImpl + ObjectSubclass;
530 // FIXME: Once associated type bounds are stable we should use ObjectSubclass::Type below
531 // instead of &gst::Element
532
sink_activate( &self, pad: &PadSinkRef, _imp: &Self::ElementImpl, _element: &gst::Element, ) -> Result<(), gst::LoggableError>533 fn sink_activate(
534 &self,
535 pad: &PadSinkRef,
536 _imp: &Self::ElementImpl,
537 _element: &gst::Element,
538 ) -> Result<(), gst::LoggableError> {
539 let gst_pad = pad.gst_pad();
540 if gst_pad.is_active() {
541 gst_debug!(
542 RUNTIME_CAT,
543 obj: gst_pad,
544 "Already activated in {:?} mode ",
545 gst_pad.mode()
546 );
547 return Ok(());
548 }
549
550 gst_pad
551 .activate_mode(gst::PadMode::Push, true)
552 .map_err(|err| {
553 gst_error!(
554 RUNTIME_CAT,
555 obj: gst_pad,
556 "Error in PadSink activate: {:?}",
557 err
558 );
559 gst::loggable_error!(RUNTIME_CAT, "Error in PadSink activate: {:?}", err)
560 })
561 }
562
sink_activatemode( &self, _pad: &PadSinkRef, _imp: &Self::ElementImpl, _element: &gst::Element, _mode: gst::PadMode, _active: bool, ) -> Result<(), gst::LoggableError>563 fn sink_activatemode(
564 &self,
565 _pad: &PadSinkRef,
566 _imp: &Self::ElementImpl,
567 _element: &gst::Element,
568 _mode: gst::PadMode,
569 _active: bool,
570 ) -> Result<(), gst::LoggableError> {
571 Ok(())
572 }
573
sink_chain( &self, _pad: &PadSinkRef, _imp: &Self::ElementImpl, _element: &gst::Element, _buffer: gst::Buffer, ) -> BoxFuture<'static, Result<FlowSuccess, FlowError>>574 fn sink_chain(
575 &self,
576 _pad: &PadSinkRef,
577 _imp: &Self::ElementImpl,
578 _element: &gst::Element,
579 _buffer: gst::Buffer,
580 ) -> BoxFuture<'static, Result<FlowSuccess, FlowError>> {
581 future::err(FlowError::NotSupported).boxed()
582 }
583
sink_chain_list( &self, _pad: &PadSinkRef, _imp: &Self::ElementImpl, _element: &gst::Element, _buffer_list: gst::BufferList, ) -> BoxFuture<'static, Result<FlowSuccess, FlowError>>584 fn sink_chain_list(
585 &self,
586 _pad: &PadSinkRef,
587 _imp: &Self::ElementImpl,
588 _element: &gst::Element,
589 _buffer_list: gst::BufferList,
590 ) -> BoxFuture<'static, Result<FlowSuccess, FlowError>> {
591 future::err(FlowError::NotSupported).boxed()
592 }
593
sink_event( &self, pad: &PadSinkRef, _imp: &Self::ElementImpl, element: &gst::Element, event: gst::Event, ) -> bool594 fn sink_event(
595 &self,
596 pad: &PadSinkRef,
597 _imp: &Self::ElementImpl,
598 element: &gst::Element,
599 event: gst::Event,
600 ) -> bool {
601 assert!(!event.is_serialized());
602 gst_log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
603 pad.gst_pad().event_default(Some(element), event)
604 }
605
sink_event_serialized( &self, pad: &PadSinkRef, _imp: &Self::ElementImpl, element: &gst::Element, event: gst::Event, ) -> BoxFuture<'static, bool>606 fn sink_event_serialized(
607 &self,
608 pad: &PadSinkRef,
609 _imp: &Self::ElementImpl,
610 element: &gst::Element,
611 event: gst::Event,
612 ) -> BoxFuture<'static, bool> {
613 assert!(event.is_serialized());
614 let pad_weak = pad.downgrade();
615 let element = element.clone();
616
617 async move {
618 let pad = pad_weak.upgrade().expect("PadSink no longer exists");
619 gst_log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
620
621 pad.gst_pad().event_default(Some(&element), event)
622 }
623 .boxed()
624 }
625
sink_event_full( &self, pad: &PadSinkRef, imp: &Self::ElementImpl, element: &gst::Element, event: gst::Event, ) -> Result<FlowSuccess, FlowError>626 fn sink_event_full(
627 &self,
628 pad: &PadSinkRef,
629 imp: &Self::ElementImpl,
630 element: &gst::Element,
631 event: gst::Event,
632 ) -> Result<FlowSuccess, FlowError> {
633 assert!(!event.is_serialized());
634 // default is to dispatch to `sink_event`
635 // (as implemented in `gst_pad_send_event_unchecked`)
636 let event_type = event.type_();
637 event_to_event_full(self.sink_event(pad, imp, element, event), event_type)
638 }
639
sink_event_full_serialized( &self, pad: &PadSinkRef, imp: &Self::ElementImpl, element: &gst::Element, event: gst::Event, ) -> BoxFuture<'static, Result<FlowSuccess, FlowError>>640 fn sink_event_full_serialized(
641 &self,
642 pad: &PadSinkRef,
643 imp: &Self::ElementImpl,
644 element: &gst::Element,
645 event: gst::Event,
646 ) -> BoxFuture<'static, Result<FlowSuccess, FlowError>> {
647 assert!(event.is_serialized());
648 // default is to dispatch to `sink_event`
649 // (as implemented in `gst_pad_send_event_unchecked`)
650 let event_type = event.type_();
651 event_to_event_full_serialized(
652 self.sink_event_serialized(pad, imp, element, event),
653 event_type,
654 )
655 }
656
sink_query( &self, pad: &PadSinkRef, _imp: &Self::ElementImpl, element: &gst::Element, query: &mut gst::QueryRef, ) -> bool657 fn sink_query(
658 &self,
659 pad: &PadSinkRef,
660 _imp: &Self::ElementImpl,
661 element: &gst::Element,
662 query: &mut gst::QueryRef,
663 ) -> bool {
664 if query.is_serialized() {
665 gst_log!(RUNTIME_CAT, obj: pad.gst_pad(), "Dropping {:?}", query);
666 // FIXME serialized queries should be handled with the dataflow
667 // but we can't return a `Future` because we couldn't honor QueryRef's lifetime
668 false
669 } else {
670 gst_log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", query);
671 pad.gst_pad().query_default(Some(element), query)
672 }
673 }
674 }
675
676 #[derive(Debug)]
677 pub struct PadSinkInner {
678 gst_pad: gst::Pad,
679 }
680
681 impl PadSinkInner {
new(gst_pad: gst::Pad) -> Self682 fn new(gst_pad: gst::Pad) -> Self {
683 if gst_pad.direction() != gst::PadDirection::Sink {
684 panic!("Wrong pad direction for PadSink");
685 }
686
687 PadSinkInner { gst_pad }
688 }
689
gst_pad(&self) -> &gst::Pad690 pub fn gst_pad(&self) -> &gst::Pad {
691 &self.gst_pad
692 }
693 }
694
695 /// A [`PadSink`] which can be moved in `Handler`s functions and `Future`s.
696 ///
697 /// Call [`upgrade`] to use the [`PadSink`].
698 ///
699 /// *See the [`pad` module] documentation for a description of the model.*
700 ///
701 /// [`PadSink`]: struct.PadSink.html
702 /// [`upgrade`]: struct.PadSinkWeak.html#method.upgrade
703 /// [`pad` module]: index.html
704 #[derive(Clone, Debug)]
705 pub struct PadSinkWeak(Weak<PadSinkInner>);
706
707 impl PadSinkWeak {
upgrade(&self) -> Option<PadSinkRef<'_>>708 pub fn upgrade(&self) -> Option<PadSinkRef<'_>> {
709 self.0.upgrade().map(PadSinkRef::new)
710 }
711 }
712
713 /// A [`PadSink`] to be used in [`handler`]s functions and `Future`s.
714 ///
715 /// Call [`downgrade`] if you need to `clone` the [`PadSink`].
716 ///
717 /// *See the [`pad` module] documentation for a description of the model.*
718 ///
719 /// [`PadSink`]: struct.PadSink.html
720 /// [`handler`]: trait.PadSinkHandler.html
721 /// [`downgrade`]: struct.PadSinkRef.html#method.downgrade
722 /// [`pad` module]: index.html
723 #[derive(Debug)]
724 pub struct PadSinkRef<'a> {
725 strong: Arc<PadSinkInner>,
726 phantom: PhantomData<&'a Self>,
727 }
728
729 impl<'a> PadSinkRef<'a> {
new(inner_arc: Arc<PadSinkInner>) -> Self730 fn new(inner_arc: Arc<PadSinkInner>) -> Self {
731 PadSinkRef {
732 strong: inner_arc,
733 phantom: PhantomData,
734 }
735 }
736
downgrade(&self) -> PadSinkWeak737 pub fn downgrade(&self) -> PadSinkWeak {
738 PadSinkWeak(Arc::downgrade(&self.strong))
739 }
740
activate_mode_hook( &self, mode: gst::PadMode, active: bool, ) -> Result<(), gst::LoggableError>741 fn activate_mode_hook(
742 &self,
743 mode: gst::PadMode,
744 active: bool,
745 ) -> Result<(), gst::LoggableError> {
746 // Important: don't panic here as the hook is used without `catch_panic_pad_function`
747 // in the default `activatemode` handling
748 gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "ActivateMode {:?}, {}", mode, active);
749
750 if mode == gst::PadMode::Pull {
751 gst_error!(RUNTIME_CAT, obj: self.gst_pad(), "Pull mode not supported by PadSink");
752 return Err(gst::loggable_error!(
753 RUNTIME_CAT,
754 "Pull mode not supported by PadSink"
755 ));
756 }
757
758 Ok(())
759 }
760
handle_future( &self, fut: impl Future<Output = Result<FlowSuccess, FlowError>> + Send + 'static, ) -> Result<FlowSuccess, FlowError>761 fn handle_future(
762 &self,
763 fut: impl Future<Output = Result<FlowSuccess, FlowError>> + Send + 'static,
764 ) -> Result<FlowSuccess, FlowError> {
765 if let Err(fut) = Context::add_sub_task(fut.map(|res| res.map(drop))) {
766 block_on_or_add_sub_task(fut.map(|res| res.map(|_| gst::FlowSuccess::Ok)))
767 .unwrap_or(Ok(gst::FlowSuccess::Ok))
768 } else {
769 Ok(gst::FlowSuccess::Ok)
770 }
771 }
772 }
773
774 impl<'a> Deref for PadSinkRef<'a> {
775 type Target = PadSinkInner;
776
deref(&self) -> &Self::Target777 fn deref(&self) -> &Self::Target {
778 &self.strong
779 }
780 }
781
782 /// The `PadSink` which `Element`s must own.
783 ///
784 /// Call [`downgrade`] if you need to `clone` the `PadSink`.
785 ///
786 /// *See the [`pad` module] documentation for a description of the model.*
787 ///
788 /// [`downgrade`]: struct.PadSink.html#method.downgrade
789 /// [`pad` module]: index.html
790 #[derive(Debug)]
791 pub struct PadSink(Arc<PadSinkInner>);
792
793 impl PadSink {
new(gst_pad: gst::Pad, handler: impl PadSinkHandler) -> Self794 pub fn new(gst_pad: gst::Pad, handler: impl PadSinkHandler) -> Self {
795 let this = PadSink(Arc::new(PadSinkInner::new(gst_pad)));
796 this.init_pad_functions(handler);
797
798 this
799 }
800
downgrade(&self) -> PadSinkWeak801 pub fn downgrade(&self) -> PadSinkWeak {
802 PadSinkWeak(Arc::downgrade(&self.0))
803 }
804
as_ref(&self) -> PadSinkRef<'_>805 pub fn as_ref(&self) -> PadSinkRef<'_> {
806 PadSinkRef::new(Arc::clone(&self.0))
807 }
808
init_pad_functions<H: PadSinkHandler>(&self, handler: H)809 fn init_pad_functions<H: PadSinkHandler>(&self, handler: H) {
810 // FIXME: Do this better
811 unsafe {
812 let handler_clone = handler.clone();
813 let inner_arc = Arc::clone(&self.0);
814 self.gst_pad()
815 .set_activate_function(move |gst_pad, parent| {
816 let handler = handler_clone.clone();
817 let inner_arc = inner_arc.clone();
818 H::ElementImpl::catch_panic_pad_function(
819 parent,
820 || {
821 gst_error!(RUNTIME_CAT, obj: gst_pad, "Panic in PadSink activate");
822 Err(gst::loggable_error!(
823 RUNTIME_CAT,
824 "Panic in PadSink activate"
825 ))
826 },
827 move |imp, element| {
828 let this_ref = PadSinkRef::new(inner_arc);
829 handler.sink_activate(
830 &this_ref,
831 imp,
832 element.dynamic_cast_ref::<gst::Element>().unwrap(),
833 )
834 },
835 )
836 });
837
838 let handler_clone = handler.clone();
839 let inner_arc = Arc::clone(&self.0);
840 self.gst_pad()
841 .set_activatemode_function(move |gst_pad, parent, mode, active| {
842 let handler = handler_clone.clone();
843 let inner_arc = inner_arc.clone();
844 H::ElementImpl::catch_panic_pad_function(
845 parent,
846 || {
847 gst_error!(RUNTIME_CAT, obj: gst_pad, "Panic in PadSink activatemode");
848 Err(gst::loggable_error!(
849 RUNTIME_CAT,
850 "Panic in PadSink activatemode"
851 ))
852 },
853 move |imp, element| {
854 let this_ref = PadSinkRef::new(inner_arc);
855 this_ref.activate_mode_hook(mode, active)?;
856
857 handler.sink_activatemode(
858 &this_ref,
859 imp,
860 element.dynamic_cast_ref::<gst::Element>().unwrap(),
861 mode,
862 active,
863 )
864 },
865 )
866 });
867
868 let handler_clone = handler.clone();
869 let inner_arc = Arc::clone(&self.0);
870 self.gst_pad()
871 .set_chain_function(move |_gst_pad, parent, buffer| {
872 let handler = handler_clone.clone();
873 let inner_arc = inner_arc.clone();
874 H::ElementImpl::catch_panic_pad_function(
875 parent,
876 || Err(FlowError::Error),
877 move |imp, element| {
878 if Context::current_has_sub_tasks() {
879 let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
880 let handler = handler.clone();
881 let element =
882 element.clone().dynamic_cast::<gst::Element>().unwrap();
883 let delayed_fut = async move {
884 let imp = <H::ElementImpl as ObjectSubclassExt>::from_instance(
885 element.unsafe_cast_ref(),
886 );
887 let this_ref =
888 this_weak.upgrade().ok_or(gst::FlowError::Flushing)?;
889 handler.sink_chain(&this_ref, imp, &element, buffer).await
890 };
891 let _ = Context::add_sub_task(delayed_fut.map(|res| res.map(drop)));
892
893 Ok(gst::FlowSuccess::Ok)
894 } else {
895 let this_ref = PadSinkRef::new(inner_arc);
896 let chain_fut = handler.sink_chain(
897 &this_ref,
898 imp,
899 element.dynamic_cast_ref::<gst::Element>().unwrap(),
900 buffer,
901 );
902 this_ref.handle_future(chain_fut)
903 }
904 },
905 )
906 });
907
908 let handler_clone = handler.clone();
909 let inner_arc = Arc::clone(&self.0);
910 self.gst_pad()
911 .set_chain_list_function(move |_gst_pad, parent, list| {
912 let handler = handler_clone.clone();
913 let inner_arc = inner_arc.clone();
914 H::ElementImpl::catch_panic_pad_function(
915 parent,
916 || Err(FlowError::Error),
917 move |imp, element| {
918 if Context::current_has_sub_tasks() {
919 let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
920 let handler = handler.clone();
921 let element =
922 element.clone().dynamic_cast::<gst::Element>().unwrap();
923 let delayed_fut = async move {
924 let imp = <H::ElementImpl as ObjectSubclassExt>::from_instance(
925 element.unsafe_cast_ref(),
926 );
927 let this_ref =
928 this_weak.upgrade().ok_or(gst::FlowError::Flushing)?;
929 handler
930 .sink_chain_list(&this_ref, imp, &element, list)
931 .await
932 };
933 let _ = Context::add_sub_task(delayed_fut.map(|res| res.map(drop)));
934
935 Ok(gst::FlowSuccess::Ok)
936 } else {
937 let this_ref = PadSinkRef::new(inner_arc);
938 let chain_list_fut = handler.sink_chain_list(
939 &this_ref,
940 imp,
941 element.dynamic_cast_ref::<gst::Element>().unwrap(),
942 list,
943 );
944 this_ref.handle_future(chain_list_fut)
945 }
946 },
947 )
948 });
949
950 // No need to `set_event_function` since `set_event_full_function`
951 // overrides it and dispatches to `sink_event` when necessary
952 let handler_clone = handler.clone();
953 let inner_arc = Arc::clone(&self.0);
954 self.gst_pad()
955 .set_event_full_function(move |_gst_pad, parent, event| {
956 let handler = handler_clone.clone();
957 let inner_arc = inner_arc.clone();
958 H::ElementImpl::catch_panic_pad_function(
959 parent,
960 || Err(FlowError::Error),
961 move |imp, element| {
962 if event.is_serialized() {
963 if Context::current_has_sub_tasks() {
964 let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
965 let handler = handler.clone();
966 let element =
967 element.clone().dynamic_cast::<gst::Element>().unwrap();
968 let delayed_fut = async move {
969 let imp =
970 <H::ElementImpl as ObjectSubclassExt>::from_instance(
971 element.unsafe_cast_ref(),
972 );
973 let this_ref =
974 this_weak.upgrade().ok_or(gst::FlowError::Flushing)?;
975
976 handler
977 .sink_event_full_serialized(
978 &this_ref, imp, &element, event,
979 )
980 .await
981 };
982 let _ =
983 Context::add_sub_task(delayed_fut.map(|res| res.map(drop)));
984
985 Ok(gst::FlowSuccess::Ok)
986 } else {
987 let this_ref = PadSinkRef::new(inner_arc);
988 let event_fut = handler.sink_event_full_serialized(
989 &this_ref,
990 imp,
991 element.dynamic_cast_ref::<gst::Element>().unwrap(),
992 event,
993 );
994 this_ref.handle_future(event_fut)
995 }
996 } else {
997 let this_ref = PadSinkRef::new(inner_arc);
998 handler.sink_event_full(
999 &this_ref,
1000 imp,
1001 element.dynamic_cast_ref::<gst::Element>().unwrap(),
1002 event,
1003 )
1004 }
1005 },
1006 )
1007 });
1008
1009 let inner_arc = Arc::clone(&self.0);
1010 self.gst_pad()
1011 .set_query_function(move |_gst_pad, parent, query| {
1012 let handler = handler.clone();
1013 let inner_arc = inner_arc.clone();
1014 H::ElementImpl::catch_panic_pad_function(
1015 parent,
1016 || false,
1017 move |imp, element| {
1018 let this_ref = PadSinkRef::new(inner_arc);
1019 if !query.is_serialized() {
1020 handler.sink_query(&this_ref, imp, element.dynamic_cast_ref::<gst::Element>().unwrap(), query)
1021 } else {
1022 gst_fixme!(RUNTIME_CAT, obj: this_ref.gst_pad(), "Serialized Query not supported");
1023 false
1024 }
1025 },
1026 )
1027 });
1028 }
1029 }
1030 }
1031
1032 impl Drop for PadSink {
drop(&mut self)1033 fn drop(&mut self) {
1034 // FIXME: Do this better
1035 unsafe {
1036 self.gst_pad()
1037 .set_activate_function(move |_gst_pad, _parent| {
1038 Err(gst::loggable_error!(
1039 RUNTIME_CAT,
1040 "PadSink no longer exists"
1041 ))
1042 });
1043 self.gst_pad()
1044 .set_activatemode_function(move |_gst_pad, _parent, _mode, _active| {
1045 Err(gst::loggable_error!(
1046 RUNTIME_CAT,
1047 "PadSink no longer exists"
1048 ))
1049 });
1050 self.gst_pad()
1051 .set_chain_function(move |_gst_pad, _parent, _buffer| Err(FlowError::Flushing));
1052 self.gst_pad()
1053 .set_chain_list_function(move |_gst_pad, _parent, _list| Err(FlowError::Flushing));
1054 self.gst_pad()
1055 .set_event_function(move |_gst_pad, _parent, _event| false);
1056 self.gst_pad()
1057 .set_event_full_function(move |_gst_pad, _parent, _event| Err(FlowError::Flushing));
1058 self.gst_pad()
1059 .set_query_function(move |_gst_pad, _parent, _query| false);
1060 }
1061 }
1062 }
1063
1064 impl Deref for PadSink {
1065 type Target = PadSinkInner;
1066
deref(&self) -> &Self::Target1067 fn deref(&self) -> &Self::Target {
1068 &self.0
1069 }
1070 }
1071