1 // Copyright (C) 2020 Mathieu Duponchelle <mathieu@centricular.com>
2 //
3 // This library is free software; you can redistribute it and/or
4 // modify it under the terms of the GNU Library General Public
5 // License as published by the Free Software Foundation; either
6 // version 2 of the License, or (at your option) any later version.
7 //
8 // This library is distributed in the hope that it will be useful,
9 // but WITHOUT ANY WARRANTY; without even the implied warranty of
10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
11 // Library General Public License for more details.
12 //
13 // You should have received a copy of the GNU Library General Public
14 // License along with this library; if not, write to the
15 // Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
16 // Boston, MA 02110-1335, USA.
17 
18 use gst::glib;
19 use gst::prelude::*;
20 use gst::subclass::prelude::*;
21 use gst::{gst_debug, gst_error, gst_info, gst_log, gst_trace, gst_warning};
22 
23 use once_cell::sync::Lazy;
24 
25 use std::cmp;
26 use std::convert::TryInto;
27 use std::sync::{Mutex, MutexGuard};
28 
29 use serde::Deserialize;
30 
31 use crate::line_reader::LineReader;
32 
33 static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
34     gst::DebugCategory::new(
35         "jsongstparse",
36         gst::DebugColorFlags::empty(),
37         Some("GStreamer Json Parser Element"),
38     )
39 });
40 
41 #[derive(Debug)]
42 struct PullState {
43     need_stream_start: bool,
44     stream_id: String,
45     offset: u64,
46     duration: Option<gst::ClockTime>,
47 }
48 
49 impl PullState {
new(element: &super::JsonGstParse, pad: &gst::Pad) -> Self50     fn new(element: &super::JsonGstParse, pad: &gst::Pad) -> Self {
51         Self {
52             need_stream_start: true,
53             stream_id: pad.create_stream_id(element, Some("src")).to_string(),
54             offset: 0,
55             duration: None,
56         }
57     }
58 }
59 
60 #[derive(Debug)]
61 struct State {
62     reader: LineReader<gst::MappedBuffer<gst::buffer::Readable>>,
63     need_segment: bool,
64     need_caps: bool,
65     format: Option<String>,
66     pending_events: Vec<gst::Event>,
67     last_position: Option<gst::ClockTime>,
68     segment: gst::FormattedSegment<gst::ClockTime>,
69 
70     // Pull mode
71     pull: Option<PullState>,
72 
73     // seeking
74     seeking: bool,
75     discont: bool,
76     seek_seqnum: Option<gst::Seqnum>,
77     last_raw_line: Vec<u8>,
78     replay_last_line: bool,
79     need_flush_stop: bool,
80 }
81 
82 impl Default for State {
default() -> Self83     fn default() -> Self {
84         Self {
85             reader: LineReader::new(),
86             need_segment: true,
87             need_caps: true,
88             format: None,
89             pending_events: Vec::new(),
90             last_position: None,
91             segment: gst::FormattedSegment::<gst::ClockTime>::new(),
92             pull: None,
93             seeking: false,
94             discont: false,
95             seek_seqnum: None,
96             last_raw_line: Vec::new(),
97             replay_last_line: false,
98             need_flush_stop: false,
99         }
100     }
101 }
102 
103 #[derive(Deserialize, Debug)]
104 enum Line<'a> {
105     Header {
106         format: String,
107     },
108     Buffer {
109         pts: Option<gst::ClockTime>,
110         duration: Option<gst::ClockTime>,
111         #[serde(borrow)]
112         data: &'a serde_json::value::RawValue,
113     },
114 }
115 
116 impl State {
line(&mut self, drain: bool) -> Result<Option<Line>, (&[u8], serde_json::Error)>117     fn line(&mut self, drain: bool) -> Result<Option<Line>, (&[u8], serde_json::Error)> {
118         let line = if self.replay_last_line {
119             self.replay_last_line = false;
120             &self.last_raw_line
121         } else {
122             match self.reader.line_with_drain(drain) {
123                 None => {
124                     return Ok(None);
125                 }
126                 Some(line) => {
127                     self.last_raw_line = line.to_vec();
128                     line
129                 }
130             }
131         };
132 
133         let line: Line = serde_json::from_slice(line).map_err(|err| (line, err))?;
134 
135         Ok(Some(line))
136     }
137 
create_events(&mut self, element: &super::JsonGstParse) -> Vec<gst::Event>138     fn create_events(&mut self, element: &super::JsonGstParse) -> Vec<gst::Event> {
139         let mut events = Vec::new();
140 
141         if self.need_flush_stop {
142             let mut b = gst::event::FlushStop::builder(true);
143 
144             if let Some(seek_seqnum) = self.seek_seqnum {
145                 b = b.seqnum(seek_seqnum);
146             }
147 
148             events.push(b.build());
149             self.need_flush_stop = false;
150         }
151 
152         if let Some(pull) = &mut self.pull {
153             if pull.need_stream_start {
154                 events.push(gst::event::StreamStart::new(&pull.stream_id));
155                 pull.need_stream_start = false;
156             }
157         }
158 
159         if self.need_caps {
160             let mut caps_builder = gst::Caps::builder("application/x-json");
161 
162             if let Some(format) = &self.format {
163                 caps_builder = caps_builder.field("format", format);
164             }
165 
166             let caps = caps_builder.build();
167 
168             events.push(gst::event::Caps::new(&caps));
169             gst_info!(CAT, obj: element, "Caps changed to {:?}", &caps);
170             self.need_caps = false;
171         }
172 
173         if self.need_segment {
174             let mut b = gst::event::Segment::builder(&self.segment);
175 
176             if let Some(seek_seqnum) = self.seek_seqnum {
177                 b = b.seqnum(seek_seqnum);
178             }
179 
180             events.push(b.build());
181             self.need_segment = false;
182         }
183 
184         events.append(&mut self.pending_events);
185         events
186     }
187 
add_buffer_metadata( &mut self, _element: &super::JsonGstParse, buffer: &mut gst::buffer::Buffer, pts: Option<gst::ClockTime>, duration: Option<gst::ClockTime>, )188     fn add_buffer_metadata(
189         &mut self,
190         _element: &super::JsonGstParse,
191         buffer: &mut gst::buffer::Buffer,
192         pts: Option<gst::ClockTime>,
193         duration: Option<gst::ClockTime>,
194     ) {
195         let buffer = buffer.get_mut().unwrap();
196 
197         self.last_position = pts.zip(duration).map(|(pts, duration)| pts + duration);
198 
199         buffer.set_pts(pts);
200 
201         if self.discont {
202             buffer.set_flags(gst::BufferFlags::DISCONT);
203             self.discont = false;
204         }
205 
206         buffer.set_duration(duration);
207     }
208 }
209 
210 pub struct JsonGstParse {
211     srcpad: gst::Pad,
212     sinkpad: gst::Pad,
213     state: Mutex<State>,
214 }
215 
216 impl JsonGstParse {
handle_buffer( &self, element: &super::JsonGstParse, buffer: Option<gst::Buffer>, ) -> Result<gst::FlowSuccess, gst::FlowError>217     fn handle_buffer(
218         &self,
219         element: &super::JsonGstParse,
220         buffer: Option<gst::Buffer>,
221     ) -> Result<gst::FlowSuccess, gst::FlowError> {
222         let mut state = self.state.lock().unwrap();
223 
224         let drain;
225         if let Some(buffer) = buffer {
226             let buffer = buffer.into_mapped_buffer_readable().map_err(|_| {
227                 gst::element_error!(
228                     element,
229                     gst::ResourceError::Read,
230                     ["Failed to map buffer readable"]
231                 );
232 
233                 gst::FlowError::Error
234             })?;
235 
236             state.reader.push(buffer);
237             drain = false;
238         } else {
239             drain = true;
240         }
241 
242         loop {
243             let seeking = state.seeking;
244             let line = state.line(drain);
245             match line {
246                 Ok(Some(Line::Buffer {
247                     pts,
248                     duration,
249                     data,
250                 })) => {
251                     gst_debug!(
252                         CAT,
253                         obj: element,
254                         "Got buffer with timestamp {} and duration {}",
255                         pts.display(),
256                         duration.display(),
257                     );
258 
259                     if !seeking {
260                         let data = data.to_string().clone();
261                         let mut events = state.create_events(element);
262 
263                         let mut buffer = gst::Buffer::from_slice(data);
264 
265                         if let Some(last_position) = state.last_position {
266                             if let Some(duration) = pts.map(|pts| pts.checked_sub(last_position)) {
267                                 events.push(
268                                     gst::event::Gap::builder(last_position)
269                                         .duration(duration)
270                                         .build(),
271                                 );
272                             }
273                         }
274 
275                         state.add_buffer_metadata(element, &mut buffer, pts, duration);
276 
277                         let send_eos = state
278                             .segment
279                             .stop()
280                             .zip(buffer.pts())
281                             .zip(buffer.duration())
282                             .map_or(false, |((stop, pts), duration)| pts + duration >= stop);
283 
284                         // Drop our state mutex while we push out buffers or events
285                         drop(state);
286 
287                         for event in events {
288                             gst_debug!(CAT, obj: element, "Pushing event {:?}", event);
289                             self.srcpad.push_event(event);
290                         }
291 
292                         self.srcpad.push(buffer).map_err(|err| {
293                             if err != gst::FlowError::Flushing {
294                                 gst_error!(CAT, obj: element, "Pushing buffer returned {:?}", err);
295                             }
296                             err
297                         })?;
298 
299                         if send_eos {
300                             return Err(gst::FlowError::Eos);
301                         }
302 
303                         state = self.state.lock().unwrap();
304                     } else {
305                         state = self.handle_skipped_line(element, pts, state);
306                     }
307                 }
308                 Ok(Some(Line::Header { format })) => {
309                     if state.format.is_none() {
310                         state.format = Some(format);
311                     } else {
312                         gst_warning!(CAT, obj: element, "Ignoring format change",);
313                     }
314                 }
315                 Err((line, err)) => {
316                     gst_error!(
317                         CAT,
318                         obj: element,
319                         "Couldn't parse line '{:?}': {:?}",
320                         std::str::from_utf8(line),
321                         err
322                     );
323 
324                     gst::element_error!(
325                         element,
326                         gst::StreamError::Decode,
327                         ["Couldn't parse line '{:?}': {:?}", line, err]
328                     );
329 
330                     break Err(gst::FlowError::Error);
331                 }
332                 Ok(None) => {
333                     if drain && state.pull.is_some() {
334                         eprintln!("Finished draining");
335                         break Err(gst::FlowError::Eos);
336                     }
337                     break Ok(gst::FlowSuccess::Ok);
338                 }
339             }
340         }
341     }
342 
handle_skipped_line( &self, element: &super::JsonGstParse, pts: impl Into<Option<gst::ClockTime>>, mut state: MutexGuard<State>, ) -> MutexGuard<State>343     fn handle_skipped_line(
344         &self,
345         element: &super::JsonGstParse,
346         pts: impl Into<Option<gst::ClockTime>>,
347         mut state: MutexGuard<State>,
348     ) -> MutexGuard<State> {
349         if pts
350             .into()
351             .zip(state.segment.start())
352             .map_or(false, |(pts, start)| pts >= start)
353         {
354             state.seeking = false;
355             state.discont = true;
356             state.replay_last_line = true;
357             state.need_flush_stop = true;
358 
359             gst_debug!(CAT, obj: element, "Done seeking");
360         }
361 
362         drop(state);
363 
364         self.state.lock().unwrap()
365     }
366 
sink_activate( &self, pad: &gst::Pad, element: &super::JsonGstParse, ) -> Result<(), gst::LoggableError>367     fn sink_activate(
368         &self,
369         pad: &gst::Pad,
370         element: &super::JsonGstParse,
371     ) -> Result<(), gst::LoggableError> {
372         let mode = {
373             let mut query = gst::query::Scheduling::new();
374             let mut state = self.state.lock().unwrap();
375 
376             state.pull = None;
377 
378             if !pad.peer_query(&mut query) {
379                 gst_debug!(CAT, obj: pad, "Scheduling query failed on peer");
380                 gst::PadMode::Push
381             } else if query
382                 .has_scheduling_mode_with_flags(gst::PadMode::Pull, gst::SchedulingFlags::SEEKABLE)
383             {
384                 gst_debug!(CAT, obj: pad, "Activating in Pull mode");
385 
386                 state.pull = Some(PullState::new(element, &self.srcpad));
387 
388                 gst::PadMode::Pull
389             } else {
390                 gst_debug!(CAT, obj: pad, "Activating in Push mode");
391                 gst::PadMode::Push
392             }
393         };
394 
395         pad.activate_mode(mode, true)?;
396         Ok(())
397     }
398 
start_task(&self, element: &super::JsonGstParse) -> Result<(), gst::LoggableError>399     fn start_task(&self, element: &super::JsonGstParse) -> Result<(), gst::LoggableError> {
400         let element_weak = element.downgrade();
401         let pad_weak = self.sinkpad.downgrade();
402         let res = self.sinkpad.start_task(move || {
403             let element = match element_weak.upgrade() {
404                 Some(element) => element,
405                 None => {
406                     if let Some(pad) = pad_weak.upgrade() {
407                         pad.pause_task().unwrap();
408                     }
409                     return;
410                 }
411             };
412 
413             let parse = Self::from_instance(&element);
414             parse.loop_fn(&element);
415         });
416         if res.is_err() {
417             return Err(gst::loggable_error!(CAT, "Failed to start pad task"));
418         }
419         Ok(())
420     }
421 
sink_activatemode( &self, _pad: &gst::Pad, element: &super::JsonGstParse, mode: gst::PadMode, active: bool, ) -> Result<(), gst::LoggableError>422     fn sink_activatemode(
423         &self,
424         _pad: &gst::Pad,
425         element: &super::JsonGstParse,
426         mode: gst::PadMode,
427         active: bool,
428     ) -> Result<(), gst::LoggableError> {
429         if mode == gst::PadMode::Pull {
430             if active {
431                 self.start_task(element)?;
432             } else {
433                 let _ = self.sinkpad.stop_task();
434             }
435         }
436 
437         Ok(())
438     }
439 
scan_duration( &self, element: &super::JsonGstParse, ) -> Result<Option<gst::ClockTime>, gst::LoggableError>440     fn scan_duration(
441         &self,
442         element: &super::JsonGstParse,
443     ) -> Result<Option<gst::ClockTime>, gst::LoggableError> {
444         gst_debug!(CAT, obj: element, "Scanning duration");
445 
446         /* First let's query the bytes duration upstream */
447         let mut q = gst::query::Duration::new(gst::Format::Bytes);
448 
449         if !self.sinkpad.peer_query(&mut q) {
450             return Err(gst::loggable_error!(
451                 CAT,
452                 "Failed to query upstream duration"
453             ));
454         }
455 
456         let size = match q.result().try_into().ok().flatten() {
457             Some(gst::format::Bytes(size)) => size,
458             None => {
459                 return Err(gst::loggable_error!(
460                     CAT,
461                     "Failed to query upstream duration"
462                 ));
463             }
464         };
465 
466         let mut offset = size;
467         let mut buffers = Vec::new();
468         let mut last_pts = None;
469 
470         loop {
471             let scan_size = cmp::min(offset, 4096);
472 
473             offset -= scan_size;
474 
475             match self.sinkpad.pull_range(offset, scan_size as u32) {
476                 Ok(buffer) => {
477                     buffers.push(buffer);
478                 }
479                 Err(flow) => {
480                     return Err(gst::loggable_error!(
481                         CAT,
482                         "Failed to pull buffer while scanning duration: {:?}",
483                         flow
484                     ));
485                 }
486             }
487 
488             let mut reader = LineReader::new();
489 
490             for buf in buffers.iter().rev() {
491                 let buf = buf
492                     .clone()
493                     .into_mapped_buffer_readable()
494                     .map_err(|_| gst::loggable_error!(CAT, "Failed to map buffer readable"))?;
495 
496                 reader.push(buf);
497             }
498 
499             while let Some(line) = reader.line_with_drain(true) {
500                 if let Ok(Line::Buffer {
501                     pts,
502                     duration,
503                     data: _data,
504                 }) = serde_json::from_slice(line)
505                 {
506                     last_pts = pts.zip(duration).map(|(pts, duration)| pts + duration);
507                 }
508             }
509 
510             if last_pts.is_some() || offset == 0 {
511                 gst_debug!(
512                     CAT,
513                     obj: element,
514                     "Duration scan done, last_pts: {:?}",
515                     last_pts
516                 );
517                 break (Ok(last_pts));
518             }
519         }
520     }
521 
push_eos(&self, element: &super::JsonGstParse)522     fn push_eos(&self, element: &super::JsonGstParse) {
523         let mut state = self.state.lock().unwrap();
524 
525         if state.seeking {
526             state.need_flush_stop = true;
527         }
528 
529         let mut events = state.create_events(element);
530         let mut eos_event = gst::event::Eos::builder();
531 
532         if let Some(seek_seqnum) = state.seek_seqnum {
533             eos_event = eos_event.seqnum(seek_seqnum);
534         }
535 
536         events.push(eos_event.build());
537 
538         // Drop our state mutex while we push out events
539         drop(state);
540 
541         for event in events {
542             gst_debug!(CAT, obj: element, "Pushing event {:?}", event);
543             self.srcpad.push_event(event);
544         }
545     }
546 
loop_fn(&self, element: &super::JsonGstParse)547     fn loop_fn(&self, element: &super::JsonGstParse) {
548         let mut state = self.state.lock().unwrap();
549         let State { ref mut pull, .. } = *state;
550         let mut pull = pull.as_mut().unwrap();
551         let offset = pull.offset;
552         let scan_duration = pull.duration.is_none();
553 
554         pull.offset += 4096;
555 
556         drop(state);
557 
558         if scan_duration {
559             match self.scan_duration(element) {
560                 Ok(pts) => {
561                     let mut state = self.state.lock().unwrap();
562                     let mut pull = state.pull.as_mut().unwrap();
563                     pull.duration = pts;
564                 }
565                 Err(err) => {
566                     err.log();
567 
568                     gst::element_error!(
569                         element,
570                         gst::StreamError::Decode,
571                         ["Failed to scan duration"]
572                     );
573 
574                     self.sinkpad.pause_task().unwrap();
575                 }
576             }
577         }
578 
579         let buffer = match self.sinkpad.pull_range(offset, 4096) {
580             Ok(buffer) => Some(buffer),
581             Err(gst::FlowError::Eos) => None,
582             Err(gst::FlowError::Flushing) => {
583                 gst_debug!(CAT, obj: &self.sinkpad, "Pausing after pulling buffer, reason: flushing");
584 
585                 self.sinkpad.pause_task().unwrap();
586                 return;
587             }
588             Err(flow) => {
589                 gst_error!(CAT, obj: &self.sinkpad, "Failed to pull, reason: {:?}", flow);
590 
591                 gst::element_error!(
592                     element,
593                     gst::StreamError::Failed,
594                     ["Streaming stopped, failed to pull buffer"]
595                 );
596 
597                 self.sinkpad.pause_task().unwrap();
598                 return;
599             }
600         };
601 
602         if let Err(flow) = self.handle_buffer(element, buffer) {
603             match flow {
604                 gst::FlowError::Flushing => {
605                     gst_debug!(CAT, obj: element, "Pausing after flow {:?}", flow);
606                 }
607                 gst::FlowError::Eos => {
608                     self.push_eos(element);
609 
610                     gst_debug!(CAT, obj: element, "Pausing after flow {:?}", flow);
611                 }
612                 _ => {
613                     self.push_eos(element);
614 
615                     gst_error!(CAT, obj: element, "Pausing after flow {:?}", flow);
616 
617                     gst::element_error!(
618                         element,
619                         gst::StreamError::Failed,
620                         ["Streaming stopped, reason: {:?}", flow]
621                     );
622                 }
623             }
624 
625             self.sinkpad.pause_task().unwrap();
626         }
627     }
628 
sink_chain( &self, pad: &gst::Pad, element: &super::JsonGstParse, buffer: gst::Buffer, ) -> Result<gst::FlowSuccess, gst::FlowError>629     fn sink_chain(
630         &self,
631         pad: &gst::Pad,
632         element: &super::JsonGstParse,
633         buffer: gst::Buffer,
634     ) -> Result<gst::FlowSuccess, gst::FlowError> {
635         gst_log!(CAT, obj: pad, "Handling buffer {:?}", buffer);
636 
637         self.handle_buffer(element, Some(buffer))
638     }
639 
flush(&self, mut state: &mut State)640     fn flush(&self, mut state: &mut State) {
641         state.reader.clear();
642         if let Some(pull) = &mut state.pull {
643             pull.offset = 0;
644         }
645         state.segment = gst::FormattedSegment::<gst::ClockTime>::new();
646         state.need_segment = true;
647         state.need_caps = true;
648         state.pending_events.clear();
649         state.last_position = None;
650         state.last_raw_line = [].to_vec();
651         state.format = None;
652     }
653 
sink_event(&self, pad: &gst::Pad, element: &super::JsonGstParse, event: gst::Event) -> bool654     fn sink_event(&self, pad: &gst::Pad, element: &super::JsonGstParse, event: gst::Event) -> bool {
655         use gst::EventView;
656 
657         gst_log!(CAT, obj: pad, "Handling event {:?}", event);
658 
659         match event.view() {
660             EventView::Caps(_) => {
661                 // We send a proper caps event from the chain function later
662                 gst_log!(CAT, obj: pad, "Dropping caps event");
663                 true
664             }
665             EventView::Segment(_) => {
666                 // We send a gst::Format::Time segment event later when needed
667                 gst_log!(CAT, obj: pad, "Dropping segment event");
668                 true
669             }
670             EventView::FlushStop(_) => {
671                 let mut state = self.state.lock().unwrap();
672                 self.flush(&mut state);
673                 drop(state);
674 
675                 pad.event_default(Some(element), event)
676             }
677             EventView::Eos(_) => {
678                 gst_log!(CAT, obj: pad, "Draining");
679                 if let Err(err) = self.handle_buffer(element, None) {
680                     gst_error!(CAT, obj: pad, "Failed to drain parser: {:?}", err);
681                 }
682                 pad.event_default(Some(element), event)
683             }
684             _ => {
685                 if event.is_sticky()
686                     && !self.srcpad.has_current_caps()
687                     && event.type_() > gst::EventType::Caps
688                 {
689                     gst_log!(CAT, obj: pad, "Deferring sticky event until we have caps");
690                     let mut state = self.state.lock().unwrap();
691                     state.pending_events.push(event);
692                     true
693                 } else {
694                     pad.event_default(Some(element), event)
695                 }
696             }
697         }
698     }
699 
perform_seek(&self, event: &gst::event::Seek, element: &super::JsonGstParse) -> bool700     fn perform_seek(&self, event: &gst::event::Seek, element: &super::JsonGstParse) -> bool {
701         if self.state.lock().unwrap().pull.is_none() {
702             gst_error!(CAT, obj: element, "seeking is only supported in pull mode");
703             return false;
704         }
705 
706         let (rate, flags, start_type, start, stop_type, stop) = event.get();
707 
708         let mut start: Option<gst::ClockTime> = match start.try_into() {
709             Ok(start) => start,
710             Err(_) => {
711                 gst_error!(CAT, obj: element, "seek has invalid format");
712                 return false;
713             }
714         };
715 
716         let mut stop: Option<gst::ClockTime> = match stop.try_into() {
717             Ok(stop) => stop,
718             Err(_) => {
719                 gst_error!(CAT, obj: element, "seek has invalid format");
720                 return false;
721             }
722         };
723 
724         if !flags.contains(gst::SeekFlags::FLUSH) {
725             gst_error!(CAT, obj: element, "only flushing seeks are supported");
726             return false;
727         }
728 
729         if start_type == gst::SeekType::End || stop_type == gst::SeekType::End {
730             gst_error!(CAT, obj: element, "Relative seeks are not supported");
731             return false;
732         }
733 
734         let seek_seqnum = event.seqnum();
735 
736         let event = gst::event::FlushStart::builder()
737             .seqnum(seek_seqnum)
738             .build();
739 
740         gst_debug!(CAT, obj: element, "Sending event {:?} upstream", event);
741         self.sinkpad.push_event(event);
742 
743         let event = gst::event::FlushStart::builder()
744             .seqnum(seek_seqnum)
745             .build();
746 
747         gst_debug!(CAT, obj: element, "Pushing event {:?}", event);
748         self.srcpad.push_event(event);
749 
750         self.sinkpad.pause_task().unwrap();
751 
752         let mut state = self.state.lock().unwrap();
753         let pull = state.pull.as_ref().unwrap();
754 
755         if start_type == gst::SeekType::Set {
756             start = start
757                 .zip(pull.duration)
758                 .map(|(start, duration)| start.min(duration))
759                 .or(start);
760         }
761 
762         if stop_type == gst::SeekType::Set {
763             stop = stop
764                 .zip(pull.duration)
765                 .map(|(stop, duration)| stop.min(duration))
766                 .or(stop);
767         }
768 
769         state.seeking = true;
770         state.seek_seqnum = Some(seek_seqnum);
771 
772         self.flush(&mut state);
773 
774         let event = gst::event::FlushStop::builder(true)
775             .seqnum(seek_seqnum)
776             .build();
777 
778         /* Drop our state while we push a serialized event upstream */
779         drop(state);
780 
781         gst_debug!(CAT, obj: element, "Sending event {:?} upstream", event);
782         self.sinkpad.push_event(event);
783 
784         state = self.state.lock().unwrap();
785 
786         state
787             .segment
788             .do_seek(rate, flags, start_type, start, stop_type, stop);
789 
790         match self.start_task(element) {
791             Err(error) => {
792                 error.log();
793                 false
794             }
795             _ => true,
796         }
797     }
798 
src_event(&self, pad: &gst::Pad, element: &super::JsonGstParse, event: gst::Event) -> bool799     fn src_event(&self, pad: &gst::Pad, element: &super::JsonGstParse, event: gst::Event) -> bool {
800         use gst::EventView;
801 
802         gst_log!(CAT, obj: pad, "Handling event {:?}", event);
803         match event.view() {
804             EventView::Seek(e) => self.perform_seek(&e, element),
805             _ => pad.event_default(Some(element), event),
806         }
807     }
808 
src_query( &self, pad: &gst::Pad, element: &super::JsonGstParse, query: &mut gst::QueryRef, ) -> bool809     fn src_query(
810         &self,
811         pad: &gst::Pad,
812         element: &super::JsonGstParse,
813         query: &mut gst::QueryRef,
814     ) -> bool {
815         use gst::QueryView;
816 
817         gst_log!(CAT, obj: pad, "Handling query {:?}", query);
818 
819         match query.view_mut() {
820             QueryView::Seeking(mut q) => {
821                 let state = self.state.lock().unwrap();
822 
823                 let fmt = q.format();
824 
825                 if fmt == gst::Format::Time {
826                     if let Some(pull) = state.pull.as_ref() {
827                         q.set(
828                             true,
829                             gst::GenericFormattedValue::Time(Some(gst::ClockTime::ZERO)),
830                             gst::GenericFormattedValue::Time(pull.duration),
831                         );
832                         true
833                     } else {
834                         false
835                     }
836                 } else {
837                     false
838                 }
839             }
840             QueryView::Position(ref mut q) => {
841                 // For Time answer ourselfs, otherwise forward
842                 if q.format() == gst::Format::Time {
843                     let state = self.state.lock().unwrap();
844                     q.set(state.last_position);
845                     true
846                 } else {
847                     self.sinkpad.peer_query(query)
848                 }
849             }
850             QueryView::Duration(ref mut q) => {
851                 // For Time answer ourselfs, otherwise forward
852                 let state = self.state.lock().unwrap();
853                 if q.format() == gst::Format::Time {
854                     if let Some(pull) = state.pull.as_ref() {
855                         if pull.duration.is_some() {
856                             q.set(state.pull.as_ref().unwrap().duration);
857                             true
858                         } else {
859                             false
860                         }
861                     } else {
862                         false
863                     }
864                 } else {
865                     self.sinkpad.peer_query(query)
866                 }
867             }
868             _ => pad.query_default(Some(element), query),
869         }
870     }
871 }
872 
873 #[glib::object_subclass]
874 impl ObjectSubclass for JsonGstParse {
875     const NAME: &'static str = "RsJsonGstParse";
876     type Type = super::JsonGstParse;
877     type ParentType = gst::Element;
878 
with_class(klass: &Self::Class) -> Self879     fn with_class(klass: &Self::Class) -> Self {
880         let templ = klass.pad_template("sink").unwrap();
881         let sinkpad = gst::Pad::builder_with_template(&templ, Some("sink"))
882             .activate_function(|pad, parent| {
883                 JsonGstParse::catch_panic_pad_function(
884                     parent,
885                     || Err(gst::loggable_error!(CAT, "Panic activating sink pad")),
886                     |parse, element| parse.sink_activate(pad, element),
887                 )
888             })
889             .activatemode_function(|pad, parent, mode, active| {
890                 JsonGstParse::catch_panic_pad_function(
891                     parent,
892                     || {
893                         Err(gst::loggable_error!(
894                             CAT,
895                             "Panic activating sink pad with mode"
896                         ))
897                     },
898                     |parse, element| parse.sink_activatemode(pad, element, mode, active),
899                 )
900             })
901             .chain_function(|pad, parent, buffer| {
902                 JsonGstParse::catch_panic_pad_function(
903                     parent,
904                     || Err(gst::FlowError::Error),
905                     |parse, element| parse.sink_chain(pad, element, buffer),
906                 )
907             })
908             .event_function(|pad, parent, event| {
909                 JsonGstParse::catch_panic_pad_function(
910                     parent,
911                     || false,
912                     |parse, element| parse.sink_event(pad, element, event),
913                 )
914             })
915             .build();
916 
917         let templ = klass.pad_template("src").unwrap();
918         let srcpad = gst::Pad::builder_with_template(&templ, Some("src"))
919             .event_function(|pad, parent, event| {
920                 JsonGstParse::catch_panic_pad_function(
921                     parent,
922                     || false,
923                     |parse, element| parse.src_event(pad, element, event),
924                 )
925             })
926             .query_function(|pad, parent, query| {
927                 JsonGstParse::catch_panic_pad_function(
928                     parent,
929                     || false,
930                     |parse, element| parse.src_query(pad, element, query),
931                 )
932             })
933             .build();
934 
935         Self {
936             srcpad,
937             sinkpad,
938             state: Mutex::new(State::default()),
939         }
940     }
941 }
942 
943 impl ObjectImpl for JsonGstParse {
constructed(&self, obj: &Self::Type)944     fn constructed(&self, obj: &Self::Type) {
945         self.parent_constructed(obj);
946 
947         obj.add_pad(&self.sinkpad).unwrap();
948         obj.add_pad(&self.srcpad).unwrap();
949     }
950 }
951 
952 impl ElementImpl for JsonGstParse {
metadata() -> Option<&'static gst::subclass::ElementMetadata>953     fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
954         static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
955             gst::subclass::ElementMetadata::new(
956                 "JSON GStreamer parser",
957                 "Parser/JSON",
958                 "Parses ndjson as output by jsongstenc",
959                 "Mathieu Duponchelle <mathieu@centricular.com>",
960             )
961         });
962 
963         Some(&*ELEMENT_METADATA)
964     }
965 
pad_templates() -> &'static [gst::PadTemplate]966     fn pad_templates() -> &'static [gst::PadTemplate] {
967         static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
968             let caps = gst::Caps::builder("application/x-json").build();
969             let src_pad_template = gst::PadTemplate::new(
970                 "src",
971                 gst::PadDirection::Src,
972                 gst::PadPresence::Always,
973                 &caps,
974             )
975             .unwrap();
976 
977             let caps = gst::Caps::new_any();
978             let sink_pad_template = gst::PadTemplate::new(
979                 "sink",
980                 gst::PadDirection::Sink,
981                 gst::PadPresence::Always,
982                 &caps,
983             )
984             .unwrap();
985 
986             vec![src_pad_template, sink_pad_template]
987         });
988 
989         PAD_TEMPLATES.as_ref()
990     }
991 
change_state( &self, element: &Self::Type, transition: gst::StateChange, ) -> Result<gst::StateChangeSuccess, gst::StateChangeError>992     fn change_state(
993         &self,
994         element: &Self::Type,
995         transition: gst::StateChange,
996     ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
997         gst_trace!(CAT, obj: element, "Changing state {:?}", transition);
998 
999         match transition {
1000             gst::StateChange::ReadyToPaused | gst::StateChange::PausedToReady => {
1001                 // Reset the whole state
1002                 let mut state = self.state.lock().unwrap();
1003                 *state = State::default();
1004             }
1005             _ => (),
1006         }
1007 
1008         self.parent_change_state(element, transition)
1009     }
1010 }
1011