1 // Copyright (C) 2020 Mathieu Duponchelle <mathieu@centricular.com> 2 // 3 // This library is free software; you can redistribute it and/or 4 // modify it under the terms of the GNU Library General Public 5 // License as published by the Free Software Foundation; either 6 // version 2 of the License, or (at your option) any later version. 7 // 8 // This library is distributed in the hope that it will be useful, 9 // but WITHOUT ANY WARRANTY; without even the implied warranty of 10 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 11 // Library General Public License for more details. 12 // 13 // You should have received a copy of the GNU Library General Public 14 // License along with this library; if not, write to the 15 // Free Software Foundation, Inc., 51 Franklin Street, Suite 500, 16 // Boston, MA 02110-1335, USA. 17 18 use gst::glib; 19 use gst::prelude::*; 20 use gst::subclass::prelude::*; 21 use gst::{gst_debug, gst_error, gst_info, gst_log, gst_trace, gst_warning}; 22 23 use once_cell::sync::Lazy; 24 25 use std::cmp; 26 use std::convert::TryInto; 27 use std::sync::{Mutex, MutexGuard}; 28 29 use serde::Deserialize; 30 31 use crate::line_reader::LineReader; 32 33 static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { 34 gst::DebugCategory::new( 35 "jsongstparse", 36 gst::DebugColorFlags::empty(), 37 Some("GStreamer Json Parser Element"), 38 ) 39 }); 40 41 #[derive(Debug)] 42 struct PullState { 43 need_stream_start: bool, 44 stream_id: String, 45 offset: u64, 46 duration: Option<gst::ClockTime>, 47 } 48 49 impl PullState { new(element: &super::JsonGstParse, pad: &gst::Pad) -> Self50 fn new(element: &super::JsonGstParse, pad: &gst::Pad) -> Self { 51 Self { 52 need_stream_start: true, 53 stream_id: pad.create_stream_id(element, Some("src")).to_string(), 54 offset: 0, 55 duration: None, 56 } 57 } 58 } 59 60 #[derive(Debug)] 61 struct State { 62 reader: LineReader<gst::MappedBuffer<gst::buffer::Readable>>, 63 need_segment: bool, 64 need_caps: bool, 65 format: Option<String>, 66 pending_events: Vec<gst::Event>, 67 last_position: Option<gst::ClockTime>, 68 segment: gst::FormattedSegment<gst::ClockTime>, 69 70 // Pull mode 71 pull: Option<PullState>, 72 73 // seeking 74 seeking: bool, 75 discont: bool, 76 seek_seqnum: Option<gst::Seqnum>, 77 last_raw_line: Vec<u8>, 78 replay_last_line: bool, 79 need_flush_stop: bool, 80 } 81 82 impl Default for State { default() -> Self83 fn default() -> Self { 84 Self { 85 reader: LineReader::new(), 86 need_segment: true, 87 need_caps: true, 88 format: None, 89 pending_events: Vec::new(), 90 last_position: None, 91 segment: gst::FormattedSegment::<gst::ClockTime>::new(), 92 pull: None, 93 seeking: false, 94 discont: false, 95 seek_seqnum: None, 96 last_raw_line: Vec::new(), 97 replay_last_line: false, 98 need_flush_stop: false, 99 } 100 } 101 } 102 103 #[derive(Deserialize, Debug)] 104 enum Line<'a> { 105 Header { 106 format: String, 107 }, 108 Buffer { 109 pts: Option<gst::ClockTime>, 110 duration: Option<gst::ClockTime>, 111 #[serde(borrow)] 112 data: &'a serde_json::value::RawValue, 113 }, 114 } 115 116 impl State { line(&mut self, drain: bool) -> Result<Option<Line>, (&[u8], serde_json::Error)>117 fn line(&mut self, drain: bool) -> Result<Option<Line>, (&[u8], serde_json::Error)> { 118 let line = if self.replay_last_line { 119 self.replay_last_line = false; 120 &self.last_raw_line 121 } else { 122 match self.reader.line_with_drain(drain) { 123 None => { 124 return Ok(None); 125 } 126 Some(line) => { 127 self.last_raw_line = line.to_vec(); 128 line 129 } 130 } 131 }; 132 133 let line: Line = serde_json::from_slice(line).map_err(|err| (line, err))?; 134 135 Ok(Some(line)) 136 } 137 create_events(&mut self, element: &super::JsonGstParse) -> Vec<gst::Event>138 fn create_events(&mut self, element: &super::JsonGstParse) -> Vec<gst::Event> { 139 let mut events = Vec::new(); 140 141 if self.need_flush_stop { 142 let mut b = gst::event::FlushStop::builder(true); 143 144 if let Some(seek_seqnum) = self.seek_seqnum { 145 b = b.seqnum(seek_seqnum); 146 } 147 148 events.push(b.build()); 149 self.need_flush_stop = false; 150 } 151 152 if let Some(pull) = &mut self.pull { 153 if pull.need_stream_start { 154 events.push(gst::event::StreamStart::new(&pull.stream_id)); 155 pull.need_stream_start = false; 156 } 157 } 158 159 if self.need_caps { 160 let mut caps_builder = gst::Caps::builder("application/x-json"); 161 162 if let Some(format) = &self.format { 163 caps_builder = caps_builder.field("format", format); 164 } 165 166 let caps = caps_builder.build(); 167 168 events.push(gst::event::Caps::new(&caps)); 169 gst_info!(CAT, obj: element, "Caps changed to {:?}", &caps); 170 self.need_caps = false; 171 } 172 173 if self.need_segment { 174 let mut b = gst::event::Segment::builder(&self.segment); 175 176 if let Some(seek_seqnum) = self.seek_seqnum { 177 b = b.seqnum(seek_seqnum); 178 } 179 180 events.push(b.build()); 181 self.need_segment = false; 182 } 183 184 events.append(&mut self.pending_events); 185 events 186 } 187 add_buffer_metadata( &mut self, _element: &super::JsonGstParse, buffer: &mut gst::buffer::Buffer, pts: Option<gst::ClockTime>, duration: Option<gst::ClockTime>, )188 fn add_buffer_metadata( 189 &mut self, 190 _element: &super::JsonGstParse, 191 buffer: &mut gst::buffer::Buffer, 192 pts: Option<gst::ClockTime>, 193 duration: Option<gst::ClockTime>, 194 ) { 195 let buffer = buffer.get_mut().unwrap(); 196 197 self.last_position = pts.zip(duration).map(|(pts, duration)| pts + duration); 198 199 buffer.set_pts(pts); 200 201 if self.discont { 202 buffer.set_flags(gst::BufferFlags::DISCONT); 203 self.discont = false; 204 } 205 206 buffer.set_duration(duration); 207 } 208 } 209 210 pub struct JsonGstParse { 211 srcpad: gst::Pad, 212 sinkpad: gst::Pad, 213 state: Mutex<State>, 214 } 215 216 impl JsonGstParse { handle_buffer( &self, element: &super::JsonGstParse, buffer: Option<gst::Buffer>, ) -> Result<gst::FlowSuccess, gst::FlowError>217 fn handle_buffer( 218 &self, 219 element: &super::JsonGstParse, 220 buffer: Option<gst::Buffer>, 221 ) -> Result<gst::FlowSuccess, gst::FlowError> { 222 let mut state = self.state.lock().unwrap(); 223 224 let drain; 225 if let Some(buffer) = buffer { 226 let buffer = buffer.into_mapped_buffer_readable().map_err(|_| { 227 gst::element_error!( 228 element, 229 gst::ResourceError::Read, 230 ["Failed to map buffer readable"] 231 ); 232 233 gst::FlowError::Error 234 })?; 235 236 state.reader.push(buffer); 237 drain = false; 238 } else { 239 drain = true; 240 } 241 242 loop { 243 let seeking = state.seeking; 244 let line = state.line(drain); 245 match line { 246 Ok(Some(Line::Buffer { 247 pts, 248 duration, 249 data, 250 })) => { 251 gst_debug!( 252 CAT, 253 obj: element, 254 "Got buffer with timestamp {} and duration {}", 255 pts.display(), 256 duration.display(), 257 ); 258 259 if !seeking { 260 let data = data.to_string().clone(); 261 let mut events = state.create_events(element); 262 263 let mut buffer = gst::Buffer::from_slice(data); 264 265 if let Some(last_position) = state.last_position { 266 if let Some(duration) = pts.map(|pts| pts.checked_sub(last_position)) { 267 events.push( 268 gst::event::Gap::builder(last_position) 269 .duration(duration) 270 .build(), 271 ); 272 } 273 } 274 275 state.add_buffer_metadata(element, &mut buffer, pts, duration); 276 277 let send_eos = state 278 .segment 279 .stop() 280 .zip(buffer.pts()) 281 .zip(buffer.duration()) 282 .map_or(false, |((stop, pts), duration)| pts + duration >= stop); 283 284 // Drop our state mutex while we push out buffers or events 285 drop(state); 286 287 for event in events { 288 gst_debug!(CAT, obj: element, "Pushing event {:?}", event); 289 self.srcpad.push_event(event); 290 } 291 292 self.srcpad.push(buffer).map_err(|err| { 293 if err != gst::FlowError::Flushing { 294 gst_error!(CAT, obj: element, "Pushing buffer returned {:?}", err); 295 } 296 err 297 })?; 298 299 if send_eos { 300 return Err(gst::FlowError::Eos); 301 } 302 303 state = self.state.lock().unwrap(); 304 } else { 305 state = self.handle_skipped_line(element, pts, state); 306 } 307 } 308 Ok(Some(Line::Header { format })) => { 309 if state.format.is_none() { 310 state.format = Some(format); 311 } else { 312 gst_warning!(CAT, obj: element, "Ignoring format change",); 313 } 314 } 315 Err((line, err)) => { 316 gst_error!( 317 CAT, 318 obj: element, 319 "Couldn't parse line '{:?}': {:?}", 320 std::str::from_utf8(line), 321 err 322 ); 323 324 gst::element_error!( 325 element, 326 gst::StreamError::Decode, 327 ["Couldn't parse line '{:?}': {:?}", line, err] 328 ); 329 330 break Err(gst::FlowError::Error); 331 } 332 Ok(None) => { 333 if drain && state.pull.is_some() { 334 eprintln!("Finished draining"); 335 break Err(gst::FlowError::Eos); 336 } 337 break Ok(gst::FlowSuccess::Ok); 338 } 339 } 340 } 341 } 342 handle_skipped_line( &self, element: &super::JsonGstParse, pts: impl Into<Option<gst::ClockTime>>, mut state: MutexGuard<State>, ) -> MutexGuard<State>343 fn handle_skipped_line( 344 &self, 345 element: &super::JsonGstParse, 346 pts: impl Into<Option<gst::ClockTime>>, 347 mut state: MutexGuard<State>, 348 ) -> MutexGuard<State> { 349 if pts 350 .into() 351 .zip(state.segment.start()) 352 .map_or(false, |(pts, start)| pts >= start) 353 { 354 state.seeking = false; 355 state.discont = true; 356 state.replay_last_line = true; 357 state.need_flush_stop = true; 358 359 gst_debug!(CAT, obj: element, "Done seeking"); 360 } 361 362 drop(state); 363 364 self.state.lock().unwrap() 365 } 366 sink_activate( &self, pad: &gst::Pad, element: &super::JsonGstParse, ) -> Result<(), gst::LoggableError>367 fn sink_activate( 368 &self, 369 pad: &gst::Pad, 370 element: &super::JsonGstParse, 371 ) -> Result<(), gst::LoggableError> { 372 let mode = { 373 let mut query = gst::query::Scheduling::new(); 374 let mut state = self.state.lock().unwrap(); 375 376 state.pull = None; 377 378 if !pad.peer_query(&mut query) { 379 gst_debug!(CAT, obj: pad, "Scheduling query failed on peer"); 380 gst::PadMode::Push 381 } else if query 382 .has_scheduling_mode_with_flags(gst::PadMode::Pull, gst::SchedulingFlags::SEEKABLE) 383 { 384 gst_debug!(CAT, obj: pad, "Activating in Pull mode"); 385 386 state.pull = Some(PullState::new(element, &self.srcpad)); 387 388 gst::PadMode::Pull 389 } else { 390 gst_debug!(CAT, obj: pad, "Activating in Push mode"); 391 gst::PadMode::Push 392 } 393 }; 394 395 pad.activate_mode(mode, true)?; 396 Ok(()) 397 } 398 start_task(&self, element: &super::JsonGstParse) -> Result<(), gst::LoggableError>399 fn start_task(&self, element: &super::JsonGstParse) -> Result<(), gst::LoggableError> { 400 let element_weak = element.downgrade(); 401 let pad_weak = self.sinkpad.downgrade(); 402 let res = self.sinkpad.start_task(move || { 403 let element = match element_weak.upgrade() { 404 Some(element) => element, 405 None => { 406 if let Some(pad) = pad_weak.upgrade() { 407 pad.pause_task().unwrap(); 408 } 409 return; 410 } 411 }; 412 413 let parse = Self::from_instance(&element); 414 parse.loop_fn(&element); 415 }); 416 if res.is_err() { 417 return Err(gst::loggable_error!(CAT, "Failed to start pad task")); 418 } 419 Ok(()) 420 } 421 sink_activatemode( &self, _pad: &gst::Pad, element: &super::JsonGstParse, mode: gst::PadMode, active: bool, ) -> Result<(), gst::LoggableError>422 fn sink_activatemode( 423 &self, 424 _pad: &gst::Pad, 425 element: &super::JsonGstParse, 426 mode: gst::PadMode, 427 active: bool, 428 ) -> Result<(), gst::LoggableError> { 429 if mode == gst::PadMode::Pull { 430 if active { 431 self.start_task(element)?; 432 } else { 433 let _ = self.sinkpad.stop_task(); 434 } 435 } 436 437 Ok(()) 438 } 439 scan_duration( &self, element: &super::JsonGstParse, ) -> Result<Option<gst::ClockTime>, gst::LoggableError>440 fn scan_duration( 441 &self, 442 element: &super::JsonGstParse, 443 ) -> Result<Option<gst::ClockTime>, gst::LoggableError> { 444 gst_debug!(CAT, obj: element, "Scanning duration"); 445 446 /* First let's query the bytes duration upstream */ 447 let mut q = gst::query::Duration::new(gst::Format::Bytes); 448 449 if !self.sinkpad.peer_query(&mut q) { 450 return Err(gst::loggable_error!( 451 CAT, 452 "Failed to query upstream duration" 453 )); 454 } 455 456 let size = match q.result().try_into().ok().flatten() { 457 Some(gst::format::Bytes(size)) => size, 458 None => { 459 return Err(gst::loggable_error!( 460 CAT, 461 "Failed to query upstream duration" 462 )); 463 } 464 }; 465 466 let mut offset = size; 467 let mut buffers = Vec::new(); 468 let mut last_pts = None; 469 470 loop { 471 let scan_size = cmp::min(offset, 4096); 472 473 offset -= scan_size; 474 475 match self.sinkpad.pull_range(offset, scan_size as u32) { 476 Ok(buffer) => { 477 buffers.push(buffer); 478 } 479 Err(flow) => { 480 return Err(gst::loggable_error!( 481 CAT, 482 "Failed to pull buffer while scanning duration: {:?}", 483 flow 484 )); 485 } 486 } 487 488 let mut reader = LineReader::new(); 489 490 for buf in buffers.iter().rev() { 491 let buf = buf 492 .clone() 493 .into_mapped_buffer_readable() 494 .map_err(|_| gst::loggable_error!(CAT, "Failed to map buffer readable"))?; 495 496 reader.push(buf); 497 } 498 499 while let Some(line) = reader.line_with_drain(true) { 500 if let Ok(Line::Buffer { 501 pts, 502 duration, 503 data: _data, 504 }) = serde_json::from_slice(line) 505 { 506 last_pts = pts.zip(duration).map(|(pts, duration)| pts + duration); 507 } 508 } 509 510 if last_pts.is_some() || offset == 0 { 511 gst_debug!( 512 CAT, 513 obj: element, 514 "Duration scan done, last_pts: {:?}", 515 last_pts 516 ); 517 break (Ok(last_pts)); 518 } 519 } 520 } 521 push_eos(&self, element: &super::JsonGstParse)522 fn push_eos(&self, element: &super::JsonGstParse) { 523 let mut state = self.state.lock().unwrap(); 524 525 if state.seeking { 526 state.need_flush_stop = true; 527 } 528 529 let mut events = state.create_events(element); 530 let mut eos_event = gst::event::Eos::builder(); 531 532 if let Some(seek_seqnum) = state.seek_seqnum { 533 eos_event = eos_event.seqnum(seek_seqnum); 534 } 535 536 events.push(eos_event.build()); 537 538 // Drop our state mutex while we push out events 539 drop(state); 540 541 for event in events { 542 gst_debug!(CAT, obj: element, "Pushing event {:?}", event); 543 self.srcpad.push_event(event); 544 } 545 } 546 loop_fn(&self, element: &super::JsonGstParse)547 fn loop_fn(&self, element: &super::JsonGstParse) { 548 let mut state = self.state.lock().unwrap(); 549 let State { ref mut pull, .. } = *state; 550 let mut pull = pull.as_mut().unwrap(); 551 let offset = pull.offset; 552 let scan_duration = pull.duration.is_none(); 553 554 pull.offset += 4096; 555 556 drop(state); 557 558 if scan_duration { 559 match self.scan_duration(element) { 560 Ok(pts) => { 561 let mut state = self.state.lock().unwrap(); 562 let mut pull = state.pull.as_mut().unwrap(); 563 pull.duration = pts; 564 } 565 Err(err) => { 566 err.log(); 567 568 gst::element_error!( 569 element, 570 gst::StreamError::Decode, 571 ["Failed to scan duration"] 572 ); 573 574 self.sinkpad.pause_task().unwrap(); 575 } 576 } 577 } 578 579 let buffer = match self.sinkpad.pull_range(offset, 4096) { 580 Ok(buffer) => Some(buffer), 581 Err(gst::FlowError::Eos) => None, 582 Err(gst::FlowError::Flushing) => { 583 gst_debug!(CAT, obj: &self.sinkpad, "Pausing after pulling buffer, reason: flushing"); 584 585 self.sinkpad.pause_task().unwrap(); 586 return; 587 } 588 Err(flow) => { 589 gst_error!(CAT, obj: &self.sinkpad, "Failed to pull, reason: {:?}", flow); 590 591 gst::element_error!( 592 element, 593 gst::StreamError::Failed, 594 ["Streaming stopped, failed to pull buffer"] 595 ); 596 597 self.sinkpad.pause_task().unwrap(); 598 return; 599 } 600 }; 601 602 if let Err(flow) = self.handle_buffer(element, buffer) { 603 match flow { 604 gst::FlowError::Flushing => { 605 gst_debug!(CAT, obj: element, "Pausing after flow {:?}", flow); 606 } 607 gst::FlowError::Eos => { 608 self.push_eos(element); 609 610 gst_debug!(CAT, obj: element, "Pausing after flow {:?}", flow); 611 } 612 _ => { 613 self.push_eos(element); 614 615 gst_error!(CAT, obj: element, "Pausing after flow {:?}", flow); 616 617 gst::element_error!( 618 element, 619 gst::StreamError::Failed, 620 ["Streaming stopped, reason: {:?}", flow] 621 ); 622 } 623 } 624 625 self.sinkpad.pause_task().unwrap(); 626 } 627 } 628 sink_chain( &self, pad: &gst::Pad, element: &super::JsonGstParse, buffer: gst::Buffer, ) -> Result<gst::FlowSuccess, gst::FlowError>629 fn sink_chain( 630 &self, 631 pad: &gst::Pad, 632 element: &super::JsonGstParse, 633 buffer: gst::Buffer, 634 ) -> Result<gst::FlowSuccess, gst::FlowError> { 635 gst_log!(CAT, obj: pad, "Handling buffer {:?}", buffer); 636 637 self.handle_buffer(element, Some(buffer)) 638 } 639 flush(&self, mut state: &mut State)640 fn flush(&self, mut state: &mut State) { 641 state.reader.clear(); 642 if let Some(pull) = &mut state.pull { 643 pull.offset = 0; 644 } 645 state.segment = gst::FormattedSegment::<gst::ClockTime>::new(); 646 state.need_segment = true; 647 state.need_caps = true; 648 state.pending_events.clear(); 649 state.last_position = None; 650 state.last_raw_line = [].to_vec(); 651 state.format = None; 652 } 653 sink_event(&self, pad: &gst::Pad, element: &super::JsonGstParse, event: gst::Event) -> bool654 fn sink_event(&self, pad: &gst::Pad, element: &super::JsonGstParse, event: gst::Event) -> bool { 655 use gst::EventView; 656 657 gst_log!(CAT, obj: pad, "Handling event {:?}", event); 658 659 match event.view() { 660 EventView::Caps(_) => { 661 // We send a proper caps event from the chain function later 662 gst_log!(CAT, obj: pad, "Dropping caps event"); 663 true 664 } 665 EventView::Segment(_) => { 666 // We send a gst::Format::Time segment event later when needed 667 gst_log!(CAT, obj: pad, "Dropping segment event"); 668 true 669 } 670 EventView::FlushStop(_) => { 671 let mut state = self.state.lock().unwrap(); 672 self.flush(&mut state); 673 drop(state); 674 675 pad.event_default(Some(element), event) 676 } 677 EventView::Eos(_) => { 678 gst_log!(CAT, obj: pad, "Draining"); 679 if let Err(err) = self.handle_buffer(element, None) { 680 gst_error!(CAT, obj: pad, "Failed to drain parser: {:?}", err); 681 } 682 pad.event_default(Some(element), event) 683 } 684 _ => { 685 if event.is_sticky() 686 && !self.srcpad.has_current_caps() 687 && event.type_() > gst::EventType::Caps 688 { 689 gst_log!(CAT, obj: pad, "Deferring sticky event until we have caps"); 690 let mut state = self.state.lock().unwrap(); 691 state.pending_events.push(event); 692 true 693 } else { 694 pad.event_default(Some(element), event) 695 } 696 } 697 } 698 } 699 perform_seek(&self, event: &gst::event::Seek, element: &super::JsonGstParse) -> bool700 fn perform_seek(&self, event: &gst::event::Seek, element: &super::JsonGstParse) -> bool { 701 if self.state.lock().unwrap().pull.is_none() { 702 gst_error!(CAT, obj: element, "seeking is only supported in pull mode"); 703 return false; 704 } 705 706 let (rate, flags, start_type, start, stop_type, stop) = event.get(); 707 708 let mut start: Option<gst::ClockTime> = match start.try_into() { 709 Ok(start) => start, 710 Err(_) => { 711 gst_error!(CAT, obj: element, "seek has invalid format"); 712 return false; 713 } 714 }; 715 716 let mut stop: Option<gst::ClockTime> = match stop.try_into() { 717 Ok(stop) => stop, 718 Err(_) => { 719 gst_error!(CAT, obj: element, "seek has invalid format"); 720 return false; 721 } 722 }; 723 724 if !flags.contains(gst::SeekFlags::FLUSH) { 725 gst_error!(CAT, obj: element, "only flushing seeks are supported"); 726 return false; 727 } 728 729 if start_type == gst::SeekType::End || stop_type == gst::SeekType::End { 730 gst_error!(CAT, obj: element, "Relative seeks are not supported"); 731 return false; 732 } 733 734 let seek_seqnum = event.seqnum(); 735 736 let event = gst::event::FlushStart::builder() 737 .seqnum(seek_seqnum) 738 .build(); 739 740 gst_debug!(CAT, obj: element, "Sending event {:?} upstream", event); 741 self.sinkpad.push_event(event); 742 743 let event = gst::event::FlushStart::builder() 744 .seqnum(seek_seqnum) 745 .build(); 746 747 gst_debug!(CAT, obj: element, "Pushing event {:?}", event); 748 self.srcpad.push_event(event); 749 750 self.sinkpad.pause_task().unwrap(); 751 752 let mut state = self.state.lock().unwrap(); 753 let pull = state.pull.as_ref().unwrap(); 754 755 if start_type == gst::SeekType::Set { 756 start = start 757 .zip(pull.duration) 758 .map(|(start, duration)| start.min(duration)) 759 .or(start); 760 } 761 762 if stop_type == gst::SeekType::Set { 763 stop = stop 764 .zip(pull.duration) 765 .map(|(stop, duration)| stop.min(duration)) 766 .or(stop); 767 } 768 769 state.seeking = true; 770 state.seek_seqnum = Some(seek_seqnum); 771 772 self.flush(&mut state); 773 774 let event = gst::event::FlushStop::builder(true) 775 .seqnum(seek_seqnum) 776 .build(); 777 778 /* Drop our state while we push a serialized event upstream */ 779 drop(state); 780 781 gst_debug!(CAT, obj: element, "Sending event {:?} upstream", event); 782 self.sinkpad.push_event(event); 783 784 state = self.state.lock().unwrap(); 785 786 state 787 .segment 788 .do_seek(rate, flags, start_type, start, stop_type, stop); 789 790 match self.start_task(element) { 791 Err(error) => { 792 error.log(); 793 false 794 } 795 _ => true, 796 } 797 } 798 src_event(&self, pad: &gst::Pad, element: &super::JsonGstParse, event: gst::Event) -> bool799 fn src_event(&self, pad: &gst::Pad, element: &super::JsonGstParse, event: gst::Event) -> bool { 800 use gst::EventView; 801 802 gst_log!(CAT, obj: pad, "Handling event {:?}", event); 803 match event.view() { 804 EventView::Seek(e) => self.perform_seek(&e, element), 805 _ => pad.event_default(Some(element), event), 806 } 807 } 808 src_query( &self, pad: &gst::Pad, element: &super::JsonGstParse, query: &mut gst::QueryRef, ) -> bool809 fn src_query( 810 &self, 811 pad: &gst::Pad, 812 element: &super::JsonGstParse, 813 query: &mut gst::QueryRef, 814 ) -> bool { 815 use gst::QueryView; 816 817 gst_log!(CAT, obj: pad, "Handling query {:?}", query); 818 819 match query.view_mut() { 820 QueryView::Seeking(mut q) => { 821 let state = self.state.lock().unwrap(); 822 823 let fmt = q.format(); 824 825 if fmt == gst::Format::Time { 826 if let Some(pull) = state.pull.as_ref() { 827 q.set( 828 true, 829 gst::GenericFormattedValue::Time(Some(gst::ClockTime::ZERO)), 830 gst::GenericFormattedValue::Time(pull.duration), 831 ); 832 true 833 } else { 834 false 835 } 836 } else { 837 false 838 } 839 } 840 QueryView::Position(ref mut q) => { 841 // For Time answer ourselfs, otherwise forward 842 if q.format() == gst::Format::Time { 843 let state = self.state.lock().unwrap(); 844 q.set(state.last_position); 845 true 846 } else { 847 self.sinkpad.peer_query(query) 848 } 849 } 850 QueryView::Duration(ref mut q) => { 851 // For Time answer ourselfs, otherwise forward 852 let state = self.state.lock().unwrap(); 853 if q.format() == gst::Format::Time { 854 if let Some(pull) = state.pull.as_ref() { 855 if pull.duration.is_some() { 856 q.set(state.pull.as_ref().unwrap().duration); 857 true 858 } else { 859 false 860 } 861 } else { 862 false 863 } 864 } else { 865 self.sinkpad.peer_query(query) 866 } 867 } 868 _ => pad.query_default(Some(element), query), 869 } 870 } 871 } 872 873 #[glib::object_subclass] 874 impl ObjectSubclass for JsonGstParse { 875 const NAME: &'static str = "RsJsonGstParse"; 876 type Type = super::JsonGstParse; 877 type ParentType = gst::Element; 878 with_class(klass: &Self::Class) -> Self879 fn with_class(klass: &Self::Class) -> Self { 880 let templ = klass.pad_template("sink").unwrap(); 881 let sinkpad = gst::Pad::builder_with_template(&templ, Some("sink")) 882 .activate_function(|pad, parent| { 883 JsonGstParse::catch_panic_pad_function( 884 parent, 885 || Err(gst::loggable_error!(CAT, "Panic activating sink pad")), 886 |parse, element| parse.sink_activate(pad, element), 887 ) 888 }) 889 .activatemode_function(|pad, parent, mode, active| { 890 JsonGstParse::catch_panic_pad_function( 891 parent, 892 || { 893 Err(gst::loggable_error!( 894 CAT, 895 "Panic activating sink pad with mode" 896 )) 897 }, 898 |parse, element| parse.sink_activatemode(pad, element, mode, active), 899 ) 900 }) 901 .chain_function(|pad, parent, buffer| { 902 JsonGstParse::catch_panic_pad_function( 903 parent, 904 || Err(gst::FlowError::Error), 905 |parse, element| parse.sink_chain(pad, element, buffer), 906 ) 907 }) 908 .event_function(|pad, parent, event| { 909 JsonGstParse::catch_panic_pad_function( 910 parent, 911 || false, 912 |parse, element| parse.sink_event(pad, element, event), 913 ) 914 }) 915 .build(); 916 917 let templ = klass.pad_template("src").unwrap(); 918 let srcpad = gst::Pad::builder_with_template(&templ, Some("src")) 919 .event_function(|pad, parent, event| { 920 JsonGstParse::catch_panic_pad_function( 921 parent, 922 || false, 923 |parse, element| parse.src_event(pad, element, event), 924 ) 925 }) 926 .query_function(|pad, parent, query| { 927 JsonGstParse::catch_panic_pad_function( 928 parent, 929 || false, 930 |parse, element| parse.src_query(pad, element, query), 931 ) 932 }) 933 .build(); 934 935 Self { 936 srcpad, 937 sinkpad, 938 state: Mutex::new(State::default()), 939 } 940 } 941 } 942 943 impl ObjectImpl for JsonGstParse { constructed(&self, obj: &Self::Type)944 fn constructed(&self, obj: &Self::Type) { 945 self.parent_constructed(obj); 946 947 obj.add_pad(&self.sinkpad).unwrap(); 948 obj.add_pad(&self.srcpad).unwrap(); 949 } 950 } 951 952 impl ElementImpl for JsonGstParse { metadata() -> Option<&'static gst::subclass::ElementMetadata>953 fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { 954 static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| { 955 gst::subclass::ElementMetadata::new( 956 "JSON GStreamer parser", 957 "Parser/JSON", 958 "Parses ndjson as output by jsongstenc", 959 "Mathieu Duponchelle <mathieu@centricular.com>", 960 ) 961 }); 962 963 Some(&*ELEMENT_METADATA) 964 } 965 pad_templates() -> &'static [gst::PadTemplate]966 fn pad_templates() -> &'static [gst::PadTemplate] { 967 static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| { 968 let caps = gst::Caps::builder("application/x-json").build(); 969 let src_pad_template = gst::PadTemplate::new( 970 "src", 971 gst::PadDirection::Src, 972 gst::PadPresence::Always, 973 &caps, 974 ) 975 .unwrap(); 976 977 let caps = gst::Caps::new_any(); 978 let sink_pad_template = gst::PadTemplate::new( 979 "sink", 980 gst::PadDirection::Sink, 981 gst::PadPresence::Always, 982 &caps, 983 ) 984 .unwrap(); 985 986 vec![src_pad_template, sink_pad_template] 987 }); 988 989 PAD_TEMPLATES.as_ref() 990 } 991 change_state( &self, element: &Self::Type, transition: gst::StateChange, ) -> Result<gst::StateChangeSuccess, gst::StateChangeError>992 fn change_state( 993 &self, 994 element: &Self::Type, 995 transition: gst::StateChange, 996 ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> { 997 gst_trace!(CAT, obj: element, "Changing state {:?}", transition); 998 999 match transition { 1000 gst::StateChange::ReadyToPaused | gst::StateChange::PausedToReady => { 1001 // Reset the whole state 1002 let mut state = self.state.lock().unwrap(); 1003 *state = State::default(); 1004 } 1005 _ => (), 1006 } 1007 1008 self.parent_change_state(element, transition) 1009 } 1010 } 1011