1 // Copyright (C) 2019-2020 François Laignel <fengalin@free.fr>
2 // Copyright (C) 2020 Sebastian Dröge <sebastian@centricular.com>
3 //
4 // This library is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU Library General Public
6 // License as published by the Free Software Foundation; either
7 // version 2 of the License, or (at your option) any later version.
8 //
9 // This library is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12 // Library General Public License for more details.
13 //
14 // You should have received a copy of the GNU Library General Public
15 // License along with this library; if not, write to the
16 // Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
17 // Boston, MA 02110-1335, USA.
18 
19 //! An implementation of `Pad`s to run asynchronous processings.
20 //!
21 //! [`PadSink`] & [`PadSrc`] provide an asynchronous API to ease the development of `Element`s in
22 //! the `threadshare` GStreamer plugins framework.
23 //!
24 //! The diagram below shows how the [`PadSrc`] & [`PadSink`] and the related `struct`s integrate in
25 //! `ts` `Element`s.
26 //!
27 //! Note: [`PadSrc`] & [`PadSink`] only support `gst::PadMode::Push` at the moment.
28 //!
29 //! ```text
30 //!    ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓          ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━
31 //!                    Element A               ┃          ┃          Element B
32 //!                                            ┃          ┃
33 //!                 ╭─────────────────╮        ┃          ┃       ╭──────────────────╮
34 //!                 │     PadSrc      │        ┃          ┃       │      PadSink     │
35 //!                 │     Handler     │        ┃          ┃       │      Handler     │
36 //!                 │─────────────────│        ┃          ┃       │──────────────────│
37 //!                 │ - src_activate* │     ╭──┸──╮    ╭──┸──╮    │ - sink_activate* │
38 //!                 │ - src_event*    │<────│     │<╌╌╌│     │───>│ - sink_chain*    │
39 //!                 │ - src_query     │<────│ gst │    │ gst │───>│ - sink_event*    │
40 //!                 │─────────────────│     │     │    │     │───>│ - sink_query     │
41 //!                 │ - task fn       │     │ Pad │    │ Pad │    ╰──────────────────╯
42 //!                 ╰─────────────────╯  ╭─>│     │╌╌╌>│     │─╮            │
43 //!           ╭───────╯      │           │  ╰──┰──╯    ╰──┰──╯ ╰───────╮    │
44 //!    ╭────────────╮   ╭────────╮ push* │     ┃          ┃          ╭─────────╮
45 //!    │ Pad Task ↺ │──>│ PadSrc │───────╯     ┃          ┃          │ PadSink │
46 //!    ╰────────────╯   ╰────────╯             ┃          ┃          ╰─────────╯
47 //!    ━━━━━━━│━━━━━━━━━━━━━━│━━━━━━━━━━━━━━━━━┛          ┗━━━━━━━━━━━━━━━│━━━━━━━━━━━━
48 //!           ╰──────────────┴───────────────────╮      ╭─────────────────╯
49 //!                                           ╭────────────╮
50 //!                                           │  Context ↺ │
51 //!                                           ╰────────────╯
52 //! ```
53 //!
54 //! Asynchronous operations for both [`PadSrc`] in `Element A` and [`PadSink`] in `Element B` run on
55 //!  the same [`Context`], which can also be shared by other `Element`s or instances of the same
56 //! `Element`s in multiple `Pipeline`s.
57 //!
58 //! `Element A` & `Element B` can also be linked to non-threadshare `Element`s in which case, they
59 //! operate in a regular synchronous way.
60 //!
61 //! Note that only operations on the streaming thread (serialized events, buffers, serialized
62 //! queries) are handled from the `PadContext` and asynchronously, everything else operates
63 //! blocking.
64 //!
65 //! [`PadSink`]: struct.PadSink.html
66 //! [`PadSrc`]: struct.PadSrc.html
67 //! [`Context`]: ../executor/struct.Context.html
68 
69 use futures::future;
70 use futures::future::BoxFuture;
71 use futures::prelude::*;
72 
73 use gst::prelude::*;
74 use gst::subclass::prelude::*;
75 use gst::{gst_debug, gst_error, gst_fixme, gst_log};
76 use gst::{FlowError, FlowSuccess};
77 
78 use std::marker::PhantomData;
79 use std::ops::Deref;
80 use std::sync::{Arc, Weak};
81 
82 use super::executor::{block_on_or_add_sub_task, Context};
83 use super::RUNTIME_CAT;
84 
85 #[inline]
event_ret_to_event_full_res( ret: bool, event_type: gst::EventType, ) -> Result<FlowSuccess, FlowError>86 fn event_ret_to_event_full_res(
87     ret: bool,
88     event_type: gst::EventType,
89 ) -> Result<FlowSuccess, FlowError> {
90     if ret {
91         Ok(FlowSuccess::Ok)
92     } else if event_type == gst::EventType::Caps {
93         Err(FlowError::NotNegotiated)
94     } else {
95         Err(FlowError::Error)
96     }
97 }
98 
99 #[inline]
event_to_event_full(ret: bool, event_type: gst::EventType) -> Result<FlowSuccess, FlowError>100 fn event_to_event_full(ret: bool, event_type: gst::EventType) -> Result<FlowSuccess, FlowError> {
101     event_ret_to_event_full_res(ret, event_type)
102 }
103 
104 #[inline]
event_to_event_full_serialized( ret: BoxFuture<'static, bool>, event_type: gst::EventType, ) -> BoxFuture<'static, Result<FlowSuccess, FlowError>>105 fn event_to_event_full_serialized(
106     ret: BoxFuture<'static, bool>,
107     event_type: gst::EventType,
108 ) -> BoxFuture<'static, Result<FlowSuccess, FlowError>> {
109     ret.map(move |ret| event_ret_to_event_full_res(ret, event_type))
110         .boxed()
111 }
112 
113 /// A trait to define `handler`s for [`PadSrc`] callbacks.
114 ///
115 /// *See the [`pad` module] documentation for a description of the model.*
116 ///
117 /// [`PadSrc`]: struct.PadSrc.html
118 /// [`pad` module]: index.html
119 pub trait PadSrcHandler: Clone + Send + Sync + 'static {
120     type ElementImpl: ElementImpl + ObjectSubclass;
121 
src_activate( &self, pad: &PadSrcRef, _imp: &Self::ElementImpl, _element: &gst::Element, ) -> Result<(), gst::LoggableError>122     fn src_activate(
123         &self,
124         pad: &PadSrcRef,
125         _imp: &Self::ElementImpl,
126         _element: &gst::Element,
127     ) -> Result<(), gst::LoggableError> {
128         let gst_pad = pad.gst_pad();
129         if gst_pad.is_active() {
130             gst_debug!(
131                 RUNTIME_CAT,
132                 obj: gst_pad,
133                 "Already activated in {:?} mode ",
134                 gst_pad.mode()
135             );
136             return Ok(());
137         }
138 
139         gst_pad
140             .activate_mode(gst::PadMode::Push, true)
141             .map_err(|err| {
142                 gst_error!(
143                     RUNTIME_CAT,
144                     obj: gst_pad,
145                     "Error in PadSrc activate: {:?}",
146                     err
147                 );
148                 gst::loggable_error!(RUNTIME_CAT, "Error in PadSrc activate: {:?}", err)
149             })
150     }
151 
src_activatemode( &self, _pad: &PadSrcRef, _imp: &Self::ElementImpl, _element: &gst::Element, _mode: gst::PadMode, _active: bool, ) -> Result<(), gst::LoggableError>152     fn src_activatemode(
153         &self,
154         _pad: &PadSrcRef,
155         _imp: &Self::ElementImpl,
156         _element: &gst::Element,
157         _mode: gst::PadMode,
158         _active: bool,
159     ) -> Result<(), gst::LoggableError> {
160         Ok(())
161     }
162 
src_event( &self, pad: &PadSrcRef, _imp: &Self::ElementImpl, element: &gst::Element, event: gst::Event, ) -> bool163     fn src_event(
164         &self,
165         pad: &PadSrcRef,
166         _imp: &Self::ElementImpl,
167         element: &gst::Element,
168         event: gst::Event,
169     ) -> bool {
170         gst_log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
171         pad.gst_pad().event_default(Some(element), event)
172     }
173 
src_event_full( &self, pad: &PadSrcRef, imp: &Self::ElementImpl, element: &gst::Element, event: gst::Event, ) -> Result<FlowSuccess, FlowError>174     fn src_event_full(
175         &self,
176         pad: &PadSrcRef,
177         imp: &Self::ElementImpl,
178         element: &gst::Element,
179         event: gst::Event,
180     ) -> Result<FlowSuccess, FlowError> {
181         // default is to dispatch to `src_event`
182         // (as implemented in `gst_pad_send_event_unchecked`)
183         let event_type = event.type_();
184         event_to_event_full(self.src_event(pad, imp, element, event), event_type)
185     }
186 
src_query( &self, pad: &PadSrcRef, _imp: &Self::ElementImpl, element: &gst::Element, query: &mut gst::QueryRef, ) -> bool187     fn src_query(
188         &self,
189         pad: &PadSrcRef,
190         _imp: &Self::ElementImpl,
191         element: &gst::Element,
192         query: &mut gst::QueryRef,
193     ) -> bool {
194         gst_log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", query);
195         if query.is_serialized() {
196             // FIXME serialized queries should be handled with the dataflow
197             // but we can't return a `Future` because we couldn't honor QueryRef's lifetime
198             false
199         } else {
200             pad.gst_pad().query_default(Some(element), query)
201         }
202     }
203 }
204 
205 #[derive(Debug)]
206 pub struct PadSrcInner {
207     gst_pad: gst::Pad,
208 }
209 
210 impl PadSrcInner {
new(gst_pad: gst::Pad) -> Self211     fn new(gst_pad: gst::Pad) -> Self {
212         if gst_pad.direction() != gst::PadDirection::Src {
213             panic!("Wrong pad direction for PadSrc");
214         }
215 
216         PadSrcInner { gst_pad }
217     }
218 
gst_pad(&self) -> &gst::Pad219     pub fn gst_pad(&self) -> &gst::Pad {
220         &self.gst_pad
221     }
222 
push(&self, buffer: gst::Buffer) -> Result<FlowSuccess, FlowError>223     pub async fn push(&self, buffer: gst::Buffer) -> Result<FlowSuccess, FlowError> {
224         gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Pushing {:?}", buffer);
225 
226         let success = self.gst_pad.push(buffer).map_err(|err| {
227             gst_error!(RUNTIME_CAT,
228                 obj: self.gst_pad(),
229                 "Failed to push Buffer to PadSrc: {:?}",
230                 err,
231             );
232             err
233         })?;
234 
235         gst_log!(RUNTIME_CAT, obj: &self.gst_pad, "Processing any pending sub tasks");
236         while Context::current_has_sub_tasks() {
237             Context::drain_sub_tasks().await?;
238         }
239 
240         Ok(success)
241     }
242 
push_list(&self, list: gst::BufferList) -> Result<FlowSuccess, FlowError>243     pub async fn push_list(&self, list: gst::BufferList) -> Result<FlowSuccess, FlowError> {
244         gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "Pushing {:?}", list);
245 
246         let success = self.gst_pad.push_list(list).map_err(|err| {
247             gst_error!(
248                 RUNTIME_CAT,
249                 obj: self.gst_pad(),
250                 "Failed to push BufferList to PadSrc: {:?}",
251                 err,
252             );
253             err
254         })?;
255 
256         gst_log!(RUNTIME_CAT, obj: &self.gst_pad, "Processing any pending sub tasks");
257         while Context::current_has_sub_tasks() {
258             Context::drain_sub_tasks().await?;
259         }
260 
261         Ok(success)
262     }
263 
push_event(&self, event: gst::Event) -> bool264     pub async fn push_event(&self, event: gst::Event) -> bool {
265         gst_log!(RUNTIME_CAT, obj: &self.gst_pad, "Pushing {:?}", event);
266 
267         let was_handled = self.gst_pad().push_event(event);
268 
269         gst_log!(RUNTIME_CAT, obj: &self.gst_pad, "Processing any pending sub tasks");
270         while Context::current_has_sub_tasks() {
271             if Context::drain_sub_tasks().await.is_err() {
272                 return false;
273             }
274         }
275 
276         was_handled
277     }
278 }
279 
280 /// A [`PadSrc`] which can be moved in [`handler`]s functions and `Future`s.
281 ///
282 /// Call [`upgrade`] to use the [`PadSrc`].
283 ///
284 /// *See the [`pad` module] documentation for a description of the model.*
285 ///
286 /// [`PadSrc`]: struct.PadSrc.html
287 /// [`handler`]: trait.PadSrcHandler.html
288 /// [`upgrade`]: struct.PadSrcWeak.html#method.upgrade
289 /// [`pad` module]: index.html
290 #[derive(Clone, Debug)]
291 pub struct PadSrcWeak(Weak<PadSrcInner>);
292 
293 impl PadSrcWeak {
upgrade(&self) -> Option<PadSrcRef<'_>>294     pub fn upgrade(&self) -> Option<PadSrcRef<'_>> {
295         self.0.upgrade().map(PadSrcRef::new)
296     }
297 }
298 
299 /// A [`PadSrc`] to be used in `Handler`s functions and `Future`s.
300 ///
301 /// Call [`downgrade`] if you need to `clone` the [`PadSrc`].
302 ///
303 /// *See the [`pad` module] documentation for a description of the model.*
304 ///
305 /// [`PadSrc`]: struct.PadSrc.html
306 /// [`PadSrcWeak`]: struct.PadSrcWeak.html
307 /// [`downgrade`]: struct.PadSrcRef.html#method.downgrade
308 /// [`pad` module]: index.html
309 #[derive(Debug)]
310 pub struct PadSrcRef<'a> {
311     strong: Arc<PadSrcInner>,
312     phantom: PhantomData<&'a Self>,
313 }
314 
315 impl<'a> PadSrcRef<'a> {
new(inner_arc: Arc<PadSrcInner>) -> Self316     fn new(inner_arc: Arc<PadSrcInner>) -> Self {
317         PadSrcRef {
318             strong: inner_arc,
319             phantom: PhantomData,
320         }
321     }
322 
downgrade(&self) -> PadSrcWeak323     pub fn downgrade(&self) -> PadSrcWeak {
324         PadSrcWeak(Arc::downgrade(&self.strong))
325     }
326 
activate_mode_hook( &self, mode: gst::PadMode, active: bool, ) -> Result<(), gst::LoggableError>327     fn activate_mode_hook(
328         &self,
329         mode: gst::PadMode,
330         active: bool,
331     ) -> Result<(), gst::LoggableError> {
332         // Important: don't panic here as the hook is used without `catch_panic_pad_function`
333         // in the default `activatemode` handling
334         gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "ActivateMode {:?}, {}", mode, active);
335 
336         if mode == gst::PadMode::Pull {
337             gst_error!(RUNTIME_CAT, obj: self.gst_pad(), "Pull mode not supported by PadSrc");
338             return Err(gst::loggable_error!(
339                 RUNTIME_CAT,
340                 "Pull mode not supported by PadSrc"
341             ));
342         }
343 
344         Ok(())
345     }
346 }
347 
348 impl<'a> Deref for PadSrcRef<'a> {
349     type Target = PadSrcInner;
350 
deref(&self) -> &Self::Target351     fn deref(&self) -> &Self::Target {
352         &self.strong
353     }
354 }
355 
356 /// The `PadSrc` which `Element`s must own.
357 ///
358 /// Call [`downgrade`] if you need to `clone` the `PadSrc`.
359 ///
360 /// *See the [`pad` module] documentation for a description of the model.*
361 ///
362 /// [`downgrade`]: struct.PadSrc.html#method.downgrade
363 /// [`pad` module]: index.html
364 #[derive(Debug)]
365 pub struct PadSrc(Arc<PadSrcInner>);
366 
367 impl PadSrc {
new(gst_pad: gst::Pad, handler: impl PadSrcHandler) -> Self368     pub fn new(gst_pad: gst::Pad, handler: impl PadSrcHandler) -> Self {
369         let this = PadSrc(Arc::new(PadSrcInner::new(gst_pad)));
370         this.init_pad_functions(handler);
371 
372         this
373     }
374 
downgrade(&self) -> PadSrcWeak375     pub fn downgrade(&self) -> PadSrcWeak {
376         PadSrcWeak(Arc::downgrade(&self.0))
377     }
378 
as_ref(&self) -> PadSrcRef<'_>379     pub fn as_ref(&self) -> PadSrcRef<'_> {
380         PadSrcRef::new(Arc::clone(&self.0))
381     }
382 
check_reconfigure(&self) -> bool383     pub fn check_reconfigure(&self) -> bool {
384         self.0.gst_pad().check_reconfigure()
385     }
386 
init_pad_functions<H: PadSrcHandler>(&self, handler: H)387     fn init_pad_functions<H: PadSrcHandler>(&self, handler: H) {
388         // FIXME: Do this better
389         unsafe {
390             let handler_clone = handler.clone();
391             let inner_arc = Arc::clone(&self.0);
392             self.0
393                 .gst_pad()
394                 .set_activate_function(move |gst_pad, parent| {
395                     let handler = handler_clone.clone();
396                     let inner_arc = inner_arc.clone();
397                     H::ElementImpl::catch_panic_pad_function(
398                         parent,
399                         || {
400                             gst_error!(RUNTIME_CAT, obj: gst_pad, "Panic in PadSrc activate");
401                             Err(gst::loggable_error!(
402                                 RUNTIME_CAT,
403                                 "Panic in PadSrc activate"
404                             ))
405                         },
406                         move |imp, element| {
407                             let this_ref = PadSrcRef::new(inner_arc);
408                             handler.src_activate(
409                                 &this_ref,
410                                 imp,
411                                 element.dynamic_cast_ref::<gst::Element>().unwrap(),
412                             )
413                         },
414                     )
415                 });
416 
417             let handler_clone = handler.clone();
418             let inner_arc = Arc::clone(&self.0);
419             self.gst_pad()
420                 .set_activatemode_function(move |gst_pad, parent, mode, active| {
421                     let handler = handler_clone.clone();
422                     let inner_arc = inner_arc.clone();
423                     H::ElementImpl::catch_panic_pad_function(
424                         parent,
425                         || {
426                             gst_error!(RUNTIME_CAT, obj: gst_pad, "Panic in PadSrc activatemode");
427                             Err(gst::loggable_error!(
428                                 RUNTIME_CAT,
429                                 "Panic in PadSrc activatemode"
430                             ))
431                         },
432                         move |imp, element| {
433                             let this_ref = PadSrcRef::new(inner_arc);
434                             this_ref.activate_mode_hook(mode, active)?;
435                             handler.src_activatemode(
436                                 &this_ref,
437                                 imp,
438                                 element.dynamic_cast_ref::<gst::Element>().unwrap(),
439                                 mode,
440                                 active,
441                             )
442                         },
443                     )
444                 });
445 
446             // No need to `set_event_function` since `set_event_full_function`
447             // overrides it and dispatches to `src_event` when necessary
448             let handler_clone = handler.clone();
449             let inner_arc = Arc::clone(&self.0);
450             self.gst_pad()
451                 .set_event_full_function(move |_gst_pad, parent, event| {
452                     let handler = handler_clone.clone();
453                     let inner_arc = inner_arc.clone();
454                     H::ElementImpl::catch_panic_pad_function(
455                         parent,
456                         || Err(FlowError::Error),
457                         move |imp, element| {
458                             let this_ref = PadSrcRef::new(inner_arc);
459                             handler.src_event_full(
460                                 &this_ref,
461                                 imp,
462                                 element.dynamic_cast_ref::<gst::Element>().unwrap(),
463                                 event,
464                             )
465                         },
466                     )
467                 });
468 
469             let inner_arc = Arc::clone(&self.0);
470             self.gst_pad()
471             .set_query_function(move |_gst_pad, parent, query| {
472                 let handler = handler.clone();
473                 let inner_arc = inner_arc.clone();
474                 H::ElementImpl::catch_panic_pad_function(
475                     parent,
476                     || false,
477                     move |imp, element| {
478                         let this_ref = PadSrcRef::new(inner_arc);
479                         if !query.is_serialized() {
480                             handler.src_query(&this_ref, imp, element.dynamic_cast_ref::<gst::Element>().unwrap(), query)
481                         } else {
482                             gst_fixme!(RUNTIME_CAT, obj: this_ref.gst_pad(), "Serialized Query not supported");
483                             false
484                         }
485                     },
486                 )
487             });
488         }
489     }
490 }
491 
492 impl Drop for PadSrc {
drop(&mut self)493     fn drop(&mut self) {
494         // FIXME: Do this better
495         unsafe {
496             self.gst_pad()
497                 .set_activate_function(move |_gst_pad, _parent| {
498                     Err(gst::loggable_error!(RUNTIME_CAT, "PadSrc no longer exists"))
499                 });
500             self.gst_pad()
501                 .set_activatemode_function(move |_gst_pad, _parent, _mode, _active| {
502                     Err(gst::loggable_error!(RUNTIME_CAT, "PadSrc no longer exists"))
503                 });
504             self.gst_pad()
505                 .set_event_function(move |_gst_pad, _parent, _event| false);
506             self.gst_pad()
507                 .set_event_full_function(move |_gst_pad, _parent, _event| Err(FlowError::Flushing));
508             self.gst_pad()
509                 .set_query_function(move |_gst_pad, _parent, _query| false);
510         }
511     }
512 }
513 
514 impl Deref for PadSrc {
515     type Target = PadSrcInner;
516 
deref(&self) -> &Self::Target517     fn deref(&self) -> &Self::Target {
518         &self.0
519     }
520 }
521 
522 /// A trait to define `handler`s for [`PadSink`] callbacks.
523 ///
524 /// *See the [`pad` module] documentation for a description of the model.*
525 ///
526 /// [`PadSink`]: struct.PadSink.html
527 /// [`pad` module]: index.html
528 pub trait PadSinkHandler: Clone + Send + Sync + 'static {
529     type ElementImpl: ElementImpl + ObjectSubclass;
530     // FIXME: Once associated type bounds are stable we should use ObjectSubclass::Type below
531     // instead of &gst::Element
532 
sink_activate( &self, pad: &PadSinkRef, _imp: &Self::ElementImpl, _element: &gst::Element, ) -> Result<(), gst::LoggableError>533     fn sink_activate(
534         &self,
535         pad: &PadSinkRef,
536         _imp: &Self::ElementImpl,
537         _element: &gst::Element,
538     ) -> Result<(), gst::LoggableError> {
539         let gst_pad = pad.gst_pad();
540         if gst_pad.is_active() {
541             gst_debug!(
542                 RUNTIME_CAT,
543                 obj: gst_pad,
544                 "Already activated in {:?} mode ",
545                 gst_pad.mode()
546             );
547             return Ok(());
548         }
549 
550         gst_pad
551             .activate_mode(gst::PadMode::Push, true)
552             .map_err(|err| {
553                 gst_error!(
554                     RUNTIME_CAT,
555                     obj: gst_pad,
556                     "Error in PadSink activate: {:?}",
557                     err
558                 );
559                 gst::loggable_error!(RUNTIME_CAT, "Error in PadSink activate: {:?}", err)
560             })
561     }
562 
sink_activatemode( &self, _pad: &PadSinkRef, _imp: &Self::ElementImpl, _element: &gst::Element, _mode: gst::PadMode, _active: bool, ) -> Result<(), gst::LoggableError>563     fn sink_activatemode(
564         &self,
565         _pad: &PadSinkRef,
566         _imp: &Self::ElementImpl,
567         _element: &gst::Element,
568         _mode: gst::PadMode,
569         _active: bool,
570     ) -> Result<(), gst::LoggableError> {
571         Ok(())
572     }
573 
sink_chain( &self, _pad: &PadSinkRef, _imp: &Self::ElementImpl, _element: &gst::Element, _buffer: gst::Buffer, ) -> BoxFuture<'static, Result<FlowSuccess, FlowError>>574     fn sink_chain(
575         &self,
576         _pad: &PadSinkRef,
577         _imp: &Self::ElementImpl,
578         _element: &gst::Element,
579         _buffer: gst::Buffer,
580     ) -> BoxFuture<'static, Result<FlowSuccess, FlowError>> {
581         future::err(FlowError::NotSupported).boxed()
582     }
583 
sink_chain_list( &self, _pad: &PadSinkRef, _imp: &Self::ElementImpl, _element: &gst::Element, _buffer_list: gst::BufferList, ) -> BoxFuture<'static, Result<FlowSuccess, FlowError>>584     fn sink_chain_list(
585         &self,
586         _pad: &PadSinkRef,
587         _imp: &Self::ElementImpl,
588         _element: &gst::Element,
589         _buffer_list: gst::BufferList,
590     ) -> BoxFuture<'static, Result<FlowSuccess, FlowError>> {
591         future::err(FlowError::NotSupported).boxed()
592     }
593 
sink_event( &self, pad: &PadSinkRef, _imp: &Self::ElementImpl, element: &gst::Element, event: gst::Event, ) -> bool594     fn sink_event(
595         &self,
596         pad: &PadSinkRef,
597         _imp: &Self::ElementImpl,
598         element: &gst::Element,
599         event: gst::Event,
600     ) -> bool {
601         assert!(!event.is_serialized());
602         gst_log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
603         pad.gst_pad().event_default(Some(element), event)
604     }
605 
sink_event_serialized( &self, pad: &PadSinkRef, _imp: &Self::ElementImpl, element: &gst::Element, event: gst::Event, ) -> BoxFuture<'static, bool>606     fn sink_event_serialized(
607         &self,
608         pad: &PadSinkRef,
609         _imp: &Self::ElementImpl,
610         element: &gst::Element,
611         event: gst::Event,
612     ) -> BoxFuture<'static, bool> {
613         assert!(event.is_serialized());
614         let pad_weak = pad.downgrade();
615         let element = element.clone();
616 
617         async move {
618             let pad = pad_weak.upgrade().expect("PadSink no longer exists");
619             gst_log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", event);
620 
621             pad.gst_pad().event_default(Some(&element), event)
622         }
623         .boxed()
624     }
625 
sink_event_full( &self, pad: &PadSinkRef, imp: &Self::ElementImpl, element: &gst::Element, event: gst::Event, ) -> Result<FlowSuccess, FlowError>626     fn sink_event_full(
627         &self,
628         pad: &PadSinkRef,
629         imp: &Self::ElementImpl,
630         element: &gst::Element,
631         event: gst::Event,
632     ) -> Result<FlowSuccess, FlowError> {
633         assert!(!event.is_serialized());
634         // default is to dispatch to `sink_event`
635         // (as implemented in `gst_pad_send_event_unchecked`)
636         let event_type = event.type_();
637         event_to_event_full(self.sink_event(pad, imp, element, event), event_type)
638     }
639 
sink_event_full_serialized( &self, pad: &PadSinkRef, imp: &Self::ElementImpl, element: &gst::Element, event: gst::Event, ) -> BoxFuture<'static, Result<FlowSuccess, FlowError>>640     fn sink_event_full_serialized(
641         &self,
642         pad: &PadSinkRef,
643         imp: &Self::ElementImpl,
644         element: &gst::Element,
645         event: gst::Event,
646     ) -> BoxFuture<'static, Result<FlowSuccess, FlowError>> {
647         assert!(event.is_serialized());
648         // default is to dispatch to `sink_event`
649         // (as implemented in `gst_pad_send_event_unchecked`)
650         let event_type = event.type_();
651         event_to_event_full_serialized(
652             self.sink_event_serialized(pad, imp, element, event),
653             event_type,
654         )
655     }
656 
sink_query( &self, pad: &PadSinkRef, _imp: &Self::ElementImpl, element: &gst::Element, query: &mut gst::QueryRef, ) -> bool657     fn sink_query(
658         &self,
659         pad: &PadSinkRef,
660         _imp: &Self::ElementImpl,
661         element: &gst::Element,
662         query: &mut gst::QueryRef,
663     ) -> bool {
664         if query.is_serialized() {
665             gst_log!(RUNTIME_CAT, obj: pad.gst_pad(), "Dropping {:?}", query);
666             // FIXME serialized queries should be handled with the dataflow
667             // but we can't return a `Future` because we couldn't honor QueryRef's lifetime
668             false
669         } else {
670             gst_log!(RUNTIME_CAT, obj: pad.gst_pad(), "Handling {:?}", query);
671             pad.gst_pad().query_default(Some(element), query)
672         }
673     }
674 }
675 
676 #[derive(Debug)]
677 pub struct PadSinkInner {
678     gst_pad: gst::Pad,
679 }
680 
681 impl PadSinkInner {
new(gst_pad: gst::Pad) -> Self682     fn new(gst_pad: gst::Pad) -> Self {
683         if gst_pad.direction() != gst::PadDirection::Sink {
684             panic!("Wrong pad direction for PadSink");
685         }
686 
687         PadSinkInner { gst_pad }
688     }
689 
gst_pad(&self) -> &gst::Pad690     pub fn gst_pad(&self) -> &gst::Pad {
691         &self.gst_pad
692     }
693 }
694 
695 /// A [`PadSink`] which can be moved in `Handler`s functions and `Future`s.
696 ///
697 /// Call [`upgrade`] to use the [`PadSink`].
698 ///
699 /// *See the [`pad` module] documentation for a description of the model.*
700 ///
701 /// [`PadSink`]: struct.PadSink.html
702 /// [`upgrade`]: struct.PadSinkWeak.html#method.upgrade
703 /// [`pad` module]: index.html
704 #[derive(Clone, Debug)]
705 pub struct PadSinkWeak(Weak<PadSinkInner>);
706 
707 impl PadSinkWeak {
upgrade(&self) -> Option<PadSinkRef<'_>>708     pub fn upgrade(&self) -> Option<PadSinkRef<'_>> {
709         self.0.upgrade().map(PadSinkRef::new)
710     }
711 }
712 
713 /// A [`PadSink`] to be used in [`handler`]s functions and `Future`s.
714 ///
715 /// Call [`downgrade`] if you need to `clone` the [`PadSink`].
716 ///
717 /// *See the [`pad` module] documentation for a description of the model.*
718 ///
719 /// [`PadSink`]: struct.PadSink.html
720 /// [`handler`]: trait.PadSinkHandler.html
721 /// [`downgrade`]: struct.PadSinkRef.html#method.downgrade
722 /// [`pad` module]: index.html
723 #[derive(Debug)]
724 pub struct PadSinkRef<'a> {
725     strong: Arc<PadSinkInner>,
726     phantom: PhantomData<&'a Self>,
727 }
728 
729 impl<'a> PadSinkRef<'a> {
new(inner_arc: Arc<PadSinkInner>) -> Self730     fn new(inner_arc: Arc<PadSinkInner>) -> Self {
731         PadSinkRef {
732             strong: inner_arc,
733             phantom: PhantomData,
734         }
735     }
736 
downgrade(&self) -> PadSinkWeak737     pub fn downgrade(&self) -> PadSinkWeak {
738         PadSinkWeak(Arc::downgrade(&self.strong))
739     }
740 
activate_mode_hook( &self, mode: gst::PadMode, active: bool, ) -> Result<(), gst::LoggableError>741     fn activate_mode_hook(
742         &self,
743         mode: gst::PadMode,
744         active: bool,
745     ) -> Result<(), gst::LoggableError> {
746         // Important: don't panic here as the hook is used without `catch_panic_pad_function`
747         // in the default `activatemode` handling
748         gst_log!(RUNTIME_CAT, obj: self.gst_pad(), "ActivateMode {:?}, {}", mode, active);
749 
750         if mode == gst::PadMode::Pull {
751             gst_error!(RUNTIME_CAT, obj: self.gst_pad(), "Pull mode not supported by PadSink");
752             return Err(gst::loggable_error!(
753                 RUNTIME_CAT,
754                 "Pull mode not supported by PadSink"
755             ));
756         }
757 
758         Ok(())
759     }
760 
handle_future( &self, fut: impl Future<Output = Result<FlowSuccess, FlowError>> + Send + 'static, ) -> Result<FlowSuccess, FlowError>761     fn handle_future(
762         &self,
763         fut: impl Future<Output = Result<FlowSuccess, FlowError>> + Send + 'static,
764     ) -> Result<FlowSuccess, FlowError> {
765         if let Err(fut) = Context::add_sub_task(fut.map(|res| res.map(drop))) {
766             block_on_or_add_sub_task(fut.map(|res| res.map(|_| gst::FlowSuccess::Ok)))
767                 .unwrap_or(Ok(gst::FlowSuccess::Ok))
768         } else {
769             Ok(gst::FlowSuccess::Ok)
770         }
771     }
772 }
773 
774 impl<'a> Deref for PadSinkRef<'a> {
775     type Target = PadSinkInner;
776 
deref(&self) -> &Self::Target777     fn deref(&self) -> &Self::Target {
778         &self.strong
779     }
780 }
781 
782 /// The `PadSink` which `Element`s must own.
783 ///
784 /// Call [`downgrade`] if you need to `clone` the `PadSink`.
785 ///
786 /// *See the [`pad` module] documentation for a description of the model.*
787 ///
788 /// [`downgrade`]: struct.PadSink.html#method.downgrade
789 /// [`pad` module]: index.html
790 #[derive(Debug)]
791 pub struct PadSink(Arc<PadSinkInner>);
792 
793 impl PadSink {
new(gst_pad: gst::Pad, handler: impl PadSinkHandler) -> Self794     pub fn new(gst_pad: gst::Pad, handler: impl PadSinkHandler) -> Self {
795         let this = PadSink(Arc::new(PadSinkInner::new(gst_pad)));
796         this.init_pad_functions(handler);
797 
798         this
799     }
800 
downgrade(&self) -> PadSinkWeak801     pub fn downgrade(&self) -> PadSinkWeak {
802         PadSinkWeak(Arc::downgrade(&self.0))
803     }
804 
as_ref(&self) -> PadSinkRef<'_>805     pub fn as_ref(&self) -> PadSinkRef<'_> {
806         PadSinkRef::new(Arc::clone(&self.0))
807     }
808 
init_pad_functions<H: PadSinkHandler>(&self, handler: H)809     fn init_pad_functions<H: PadSinkHandler>(&self, handler: H) {
810         // FIXME: Do this better
811         unsafe {
812             let handler_clone = handler.clone();
813             let inner_arc = Arc::clone(&self.0);
814             self.gst_pad()
815                 .set_activate_function(move |gst_pad, parent| {
816                     let handler = handler_clone.clone();
817                     let inner_arc = inner_arc.clone();
818                     H::ElementImpl::catch_panic_pad_function(
819                         parent,
820                         || {
821                             gst_error!(RUNTIME_CAT, obj: gst_pad, "Panic in PadSink activate");
822                             Err(gst::loggable_error!(
823                                 RUNTIME_CAT,
824                                 "Panic in PadSink activate"
825                             ))
826                         },
827                         move |imp, element| {
828                             let this_ref = PadSinkRef::new(inner_arc);
829                             handler.sink_activate(
830                                 &this_ref,
831                                 imp,
832                                 element.dynamic_cast_ref::<gst::Element>().unwrap(),
833                             )
834                         },
835                     )
836                 });
837 
838             let handler_clone = handler.clone();
839             let inner_arc = Arc::clone(&self.0);
840             self.gst_pad()
841                 .set_activatemode_function(move |gst_pad, parent, mode, active| {
842                     let handler = handler_clone.clone();
843                     let inner_arc = inner_arc.clone();
844                     H::ElementImpl::catch_panic_pad_function(
845                         parent,
846                         || {
847                             gst_error!(RUNTIME_CAT, obj: gst_pad, "Panic in PadSink activatemode");
848                             Err(gst::loggable_error!(
849                                 RUNTIME_CAT,
850                                 "Panic in PadSink activatemode"
851                             ))
852                         },
853                         move |imp, element| {
854                             let this_ref = PadSinkRef::new(inner_arc);
855                             this_ref.activate_mode_hook(mode, active)?;
856 
857                             handler.sink_activatemode(
858                                 &this_ref,
859                                 imp,
860                                 element.dynamic_cast_ref::<gst::Element>().unwrap(),
861                                 mode,
862                                 active,
863                             )
864                         },
865                     )
866                 });
867 
868             let handler_clone = handler.clone();
869             let inner_arc = Arc::clone(&self.0);
870             self.gst_pad()
871                 .set_chain_function(move |_gst_pad, parent, buffer| {
872                     let handler = handler_clone.clone();
873                     let inner_arc = inner_arc.clone();
874                     H::ElementImpl::catch_panic_pad_function(
875                         parent,
876                         || Err(FlowError::Error),
877                         move |imp, element| {
878                             if Context::current_has_sub_tasks() {
879                                 let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
880                                 let handler = handler.clone();
881                                 let element =
882                                     element.clone().dynamic_cast::<gst::Element>().unwrap();
883                                 let delayed_fut = async move {
884                                     let imp = <H::ElementImpl as ObjectSubclassExt>::from_instance(
885                                         element.unsafe_cast_ref(),
886                                     );
887                                     let this_ref =
888                                         this_weak.upgrade().ok_or(gst::FlowError::Flushing)?;
889                                     handler.sink_chain(&this_ref, imp, &element, buffer).await
890                                 };
891                                 let _ = Context::add_sub_task(delayed_fut.map(|res| res.map(drop)));
892 
893                                 Ok(gst::FlowSuccess::Ok)
894                             } else {
895                                 let this_ref = PadSinkRef::new(inner_arc);
896                                 let chain_fut = handler.sink_chain(
897                                     &this_ref,
898                                     imp,
899                                     element.dynamic_cast_ref::<gst::Element>().unwrap(),
900                                     buffer,
901                                 );
902                                 this_ref.handle_future(chain_fut)
903                             }
904                         },
905                     )
906                 });
907 
908             let handler_clone = handler.clone();
909             let inner_arc = Arc::clone(&self.0);
910             self.gst_pad()
911                 .set_chain_list_function(move |_gst_pad, parent, list| {
912                     let handler = handler_clone.clone();
913                     let inner_arc = inner_arc.clone();
914                     H::ElementImpl::catch_panic_pad_function(
915                         parent,
916                         || Err(FlowError::Error),
917                         move |imp, element| {
918                             if Context::current_has_sub_tasks() {
919                                 let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
920                                 let handler = handler.clone();
921                                 let element =
922                                     element.clone().dynamic_cast::<gst::Element>().unwrap();
923                                 let delayed_fut = async move {
924                                     let imp = <H::ElementImpl as ObjectSubclassExt>::from_instance(
925                                         element.unsafe_cast_ref(),
926                                     );
927                                     let this_ref =
928                                         this_weak.upgrade().ok_or(gst::FlowError::Flushing)?;
929                                     handler
930                                         .sink_chain_list(&this_ref, imp, &element, list)
931                                         .await
932                                 };
933                                 let _ = Context::add_sub_task(delayed_fut.map(|res| res.map(drop)));
934 
935                                 Ok(gst::FlowSuccess::Ok)
936                             } else {
937                                 let this_ref = PadSinkRef::new(inner_arc);
938                                 let chain_list_fut = handler.sink_chain_list(
939                                     &this_ref,
940                                     imp,
941                                     element.dynamic_cast_ref::<gst::Element>().unwrap(),
942                                     list,
943                                 );
944                                 this_ref.handle_future(chain_list_fut)
945                             }
946                         },
947                     )
948                 });
949 
950             // No need to `set_event_function` since `set_event_full_function`
951             // overrides it and dispatches to `sink_event` when necessary
952             let handler_clone = handler.clone();
953             let inner_arc = Arc::clone(&self.0);
954             self.gst_pad()
955                 .set_event_full_function(move |_gst_pad, parent, event| {
956                     let handler = handler_clone.clone();
957                     let inner_arc = inner_arc.clone();
958                     H::ElementImpl::catch_panic_pad_function(
959                         parent,
960                         || Err(FlowError::Error),
961                         move |imp, element| {
962                             if event.is_serialized() {
963                                 if Context::current_has_sub_tasks() {
964                                     let this_weak = PadSinkWeak(Arc::downgrade(&inner_arc));
965                                     let handler = handler.clone();
966                                     let element =
967                                         element.clone().dynamic_cast::<gst::Element>().unwrap();
968                                     let delayed_fut = async move {
969                                         let imp =
970                                             <H::ElementImpl as ObjectSubclassExt>::from_instance(
971                                                 element.unsafe_cast_ref(),
972                                             );
973                                         let this_ref =
974                                             this_weak.upgrade().ok_or(gst::FlowError::Flushing)?;
975 
976                                         handler
977                                             .sink_event_full_serialized(
978                                                 &this_ref, imp, &element, event,
979                                             )
980                                             .await
981                                     };
982                                     let _ =
983                                         Context::add_sub_task(delayed_fut.map(|res| res.map(drop)));
984 
985                                     Ok(gst::FlowSuccess::Ok)
986                                 } else {
987                                     let this_ref = PadSinkRef::new(inner_arc);
988                                     let event_fut = handler.sink_event_full_serialized(
989                                         &this_ref,
990                                         imp,
991                                         element.dynamic_cast_ref::<gst::Element>().unwrap(),
992                                         event,
993                                     );
994                                     this_ref.handle_future(event_fut)
995                                 }
996                             } else {
997                                 let this_ref = PadSinkRef::new(inner_arc);
998                                 handler.sink_event_full(
999                                     &this_ref,
1000                                     imp,
1001                                     element.dynamic_cast_ref::<gst::Element>().unwrap(),
1002                                     event,
1003                                 )
1004                             }
1005                         },
1006                     )
1007                 });
1008 
1009             let inner_arc = Arc::clone(&self.0);
1010             self.gst_pad()
1011             .set_query_function(move |_gst_pad, parent, query| {
1012                 let handler = handler.clone();
1013                 let inner_arc = inner_arc.clone();
1014                 H::ElementImpl::catch_panic_pad_function(
1015                     parent,
1016                     || false,
1017                     move |imp, element| {
1018                         let this_ref = PadSinkRef::new(inner_arc);
1019                         if !query.is_serialized() {
1020                             handler.sink_query(&this_ref, imp, element.dynamic_cast_ref::<gst::Element>().unwrap(), query)
1021                         } else {
1022                             gst_fixme!(RUNTIME_CAT, obj: this_ref.gst_pad(), "Serialized Query not supported");
1023                             false
1024                         }
1025                     },
1026                 )
1027             });
1028         }
1029     }
1030 }
1031 
1032 impl Drop for PadSink {
drop(&mut self)1033     fn drop(&mut self) {
1034         // FIXME: Do this better
1035         unsafe {
1036             self.gst_pad()
1037                 .set_activate_function(move |_gst_pad, _parent| {
1038                     Err(gst::loggable_error!(
1039                         RUNTIME_CAT,
1040                         "PadSink no longer exists"
1041                     ))
1042                 });
1043             self.gst_pad()
1044                 .set_activatemode_function(move |_gst_pad, _parent, _mode, _active| {
1045                     Err(gst::loggable_error!(
1046                         RUNTIME_CAT,
1047                         "PadSink no longer exists"
1048                     ))
1049                 });
1050             self.gst_pad()
1051                 .set_chain_function(move |_gst_pad, _parent, _buffer| Err(FlowError::Flushing));
1052             self.gst_pad()
1053                 .set_chain_list_function(move |_gst_pad, _parent, _list| Err(FlowError::Flushing));
1054             self.gst_pad()
1055                 .set_event_function(move |_gst_pad, _parent, _event| false);
1056             self.gst_pad()
1057                 .set_event_full_function(move |_gst_pad, _parent, _event| Err(FlowError::Flushing));
1058             self.gst_pad()
1059                 .set_query_function(move |_gst_pad, _parent, _query| false);
1060         }
1061     }
1062 }
1063 
1064 impl Deref for PadSink {
1065     type Target = PadSinkInner;
1066 
deref(&self) -> &Self::Target1067     fn deref(&self) -> &Self::Target {
1068         &self.0
1069     }
1070 }
1071