1 // Copyright (C) 2020 Natanael Mojica <neithanmo@gmail.com> 2 // 3 // Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or 4 // http://www.apache.org/licenses/LICENSE-2.0> or the MIT license 5 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your 6 // option. This file may not be copied, modified, or distributed 7 // except according to those terms. 8 9 use std::{io, io::Write, sync::Arc}; 10 11 use gst::glib; 12 use gst::prelude::*; 13 use gst::subclass::prelude::*; 14 use gst::{gst_debug, gst_error}; 15 use gst_video::prelude::*; 16 use gst_video::subclass::prelude::*; 17 18 use atomic_refcell::AtomicRefCell; 19 use once_cell::sync::Lazy; 20 use parking_lot::Mutex; 21 22 use super::CompressionLevel; 23 use super::FilterType; 24 25 const DEFAULT_COMPRESSION_LEVEL: CompressionLevel = CompressionLevel::Default; 26 const DEFAULT_FILTER_TYPE: FilterType = FilterType::NoFilter; 27 28 static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { 29 gst::DebugCategory::new( 30 "rspngenc", 31 gst::DebugColorFlags::empty(), 32 Some("PNG encoder"), 33 ) 34 }); 35 36 // Inner buffer where the result of frame encoding is written 37 // before relay them downstream 38 struct CacheBuffer { 39 buffer: AtomicRefCell<Vec<u8>>, 40 } 41 42 impl CacheBuffer { new() -> Self43 pub fn new() -> Self { 44 Self { 45 buffer: AtomicRefCell::new(Vec::new()), 46 } 47 } 48 clear(&self)49 pub fn clear(&self) { 50 self.buffer.borrow_mut().clear(); 51 } 52 write(&self, buf: &[u8])53 pub fn write(&self, buf: &[u8]) { 54 let mut buffer = self.buffer.borrow_mut(); 55 buffer.extend_from_slice(buf); 56 } 57 consume(&self) -> Vec<u8>58 pub fn consume(&self) -> Vec<u8> { 59 let mut buffer = self.buffer.borrow_mut(); 60 std::mem::take(&mut *buffer) 61 } 62 } 63 // The Encoder requires a Writer, so we use here and intermediate structure 64 // for caching encoded frames 65 struct CacheWriter { 66 cache: Arc<CacheBuffer>, 67 } 68 69 impl CacheWriter { new(cache: Arc<CacheBuffer>) -> Self70 pub fn new(cache: Arc<CacheBuffer>) -> Self { 71 Self { cache } 72 } 73 } 74 75 impl Write for CacheWriter { write(&mut self, buf: &[u8]) -> io::Result<usize>76 fn write(&mut self, buf: &[u8]) -> io::Result<usize> { 77 self.cache.write(buf); 78 Ok(buf.len()) 79 } 80 flush(&mut self) -> io::Result<()>81 fn flush(&mut self) -> io::Result<()> { 82 Ok(()) 83 } 84 } 85 86 #[derive(Debug, Clone, Copy)] 87 struct Settings { 88 compression: CompressionLevel, 89 filter: FilterType, 90 } 91 92 impl Default for Settings { default() -> Self93 fn default() -> Self { 94 Settings { 95 compression: DEFAULT_COMPRESSION_LEVEL, 96 filter: DEFAULT_FILTER_TYPE, 97 } 98 } 99 } 100 101 struct State { 102 video_info: gst_video::VideoInfo, 103 cache: Arc<CacheBuffer>, 104 writer: Option<png::Writer<CacheWriter>>, 105 } 106 107 impl State { new(video_info: gst_video::VideoInfo) -> Self108 fn new(video_info: gst_video::VideoInfo) -> Self { 109 let cache = Arc::new(CacheBuffer::new()); 110 Self { 111 video_info, 112 cache, 113 writer: None, 114 } 115 } 116 reset(&mut self, settings: Settings) -> Result<(), gst::LoggableError>117 fn reset(&mut self, settings: Settings) -> Result<(), gst::LoggableError> { 118 // clear the cache 119 self.cache.clear(); 120 let width = self.video_info.width(); 121 let height = self.video_info.height(); 122 let mut encoder = png::Encoder::new(CacheWriter::new(self.cache.clone()), width, height); 123 let color = match self.video_info.format() { 124 gst_video::VideoFormat::Gray8 | gst_video::VideoFormat::Gray16Be => { 125 png::ColorType::Grayscale 126 } 127 gst_video::VideoFormat::Rgb => png::ColorType::Rgb, 128 gst_video::VideoFormat::Rgba => png::ColorType::Rgba, 129 _ => { 130 gst_error!(CAT, "format is not supported yet"); 131 unreachable!() 132 } 133 }; 134 let depth = if self.video_info.format() == gst_video::VideoFormat::Gray16Be { 135 png::BitDepth::Sixteen 136 } else { 137 png::BitDepth::Eight 138 }; 139 140 encoder.set_color(color); 141 encoder.set_depth(depth); 142 encoder.set_compression(png::Compression::from(settings.compression)); 143 encoder.set_filter(png::FilterType::from(settings.filter)); 144 // Write the header for this video format into our inner buffer 145 let writer = encoder.write_header().map_err(|e| { 146 gst::loggable_error!(CAT, "Failed to create encoder error: {}", e.to_string()) 147 })?; 148 self.writer = Some(writer); 149 Ok(()) 150 } 151 write_data(&mut self, data: &[u8]) -> Result<(), png::EncodingError>152 fn write_data(&mut self, data: &[u8]) -> Result<(), png::EncodingError> { 153 if let Some(writer) = self.writer.as_mut() { 154 writer.write_image_data(data) 155 } else { 156 unreachable!() 157 } 158 } 159 } 160 161 #[derive(Default)] 162 pub struct PngEncoder { 163 state: Mutex<Option<State>>, 164 settings: Mutex<Settings>, 165 } 166 167 #[glib::object_subclass] 168 impl ObjectSubclass for PngEncoder { 169 const NAME: &'static str = "PngEncoder"; 170 type Type = super::PngEncoder; 171 type ParentType = gst_video::VideoEncoder; 172 } 173 174 impl ObjectImpl for PngEncoder { properties() -> &'static [glib::ParamSpec]175 fn properties() -> &'static [glib::ParamSpec] { 176 static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| { 177 vec![ 178 glib::ParamSpec::new_enum( 179 "compression-level", 180 "Compression level", 181 "Selects the compression algorithm to use", 182 CompressionLevel::static_type(), 183 DEFAULT_COMPRESSION_LEVEL as i32, 184 glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, 185 ), 186 glib::ParamSpec::new_enum( 187 "filter", 188 "Filter", 189 "Selects the filter type to applied", 190 FilterType::static_type(), 191 DEFAULT_FILTER_TYPE as i32, 192 glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, 193 ), 194 ] 195 }); 196 197 PROPERTIES.as_ref() 198 } 199 set_property( &self, _obj: &Self::Type, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec, )200 fn set_property( 201 &self, 202 _obj: &Self::Type, 203 _id: usize, 204 value: &glib::Value, 205 pspec: &glib::ParamSpec, 206 ) { 207 match pspec.name() { 208 "compression-level" => { 209 let mut settings = self.settings.lock(); 210 settings.compression = value 211 .get::<CompressionLevel>() 212 .expect("type checked upstream"); 213 } 214 "filter" => { 215 let mut settings = self.settings.lock(); 216 settings.filter = value.get::<FilterType>().expect("type checked upstream"); 217 } 218 _ => unreachable!(), 219 } 220 } 221 property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value222 fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { 223 match pspec.name() { 224 "compression-level" => { 225 let settings = self.settings.lock(); 226 settings.compression.to_value() 227 } 228 "filter" => { 229 let settings = self.settings.lock(); 230 settings.filter.to_value() 231 } 232 _ => unimplemented!(), 233 } 234 } 235 } 236 237 impl ElementImpl for PngEncoder { metadata() -> Option<&'static gst::subclass::ElementMetadata>238 fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { 239 static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| { 240 gst::subclass::ElementMetadata::new( 241 "PNG encoder", 242 "Encoder/Video", 243 "PNG encoder", 244 "Natanael Mojica <neithanmo@gmail>", 245 ) 246 }); 247 248 Some(&*ELEMENT_METADATA) 249 } 250 pad_templates() -> &'static [gst::PadTemplate]251 fn pad_templates() -> &'static [gst::PadTemplate] { 252 static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| { 253 let sink_caps = gst::Caps::new_simple( 254 "video/x-raw", 255 &[ 256 ( 257 "format", 258 &gst::List::new(&[ 259 &gst_video::VideoFormat::Gray8.to_str(), 260 &gst_video::VideoFormat::Gray16Be.to_str(), 261 &gst_video::VideoFormat::Rgb.to_str(), 262 &gst_video::VideoFormat::Rgba.to_str(), 263 ]), 264 ), 265 ("width", &gst::IntRange::<i32>::new(1, std::i32::MAX)), 266 ("height", &gst::IntRange::<i32>::new(1, std::i32::MAX)), 267 ( 268 "framerate", 269 &gst::FractionRange::new( 270 gst::Fraction::new(1, 1), 271 gst::Fraction::new(std::i32::MAX, 1), 272 ), 273 ), 274 ], 275 ); 276 let sink_pad_template = gst::PadTemplate::new( 277 "sink", 278 gst::PadDirection::Sink, 279 gst::PadPresence::Always, 280 &sink_caps, 281 ) 282 .unwrap(); 283 284 let src_caps = gst::Caps::new_simple("image/png", &[]); 285 let src_pad_template = gst::PadTemplate::new( 286 "src", 287 gst::PadDirection::Src, 288 gst::PadPresence::Always, 289 &src_caps, 290 ) 291 .unwrap(); 292 293 vec![sink_pad_template, src_pad_template] 294 }); 295 296 PAD_TEMPLATES.as_ref() 297 } 298 } 299 300 impl VideoEncoderImpl for PngEncoder { stop(&self, _element: &Self::Type) -> Result<(), gst::ErrorMessage>301 fn stop(&self, _element: &Self::Type) -> Result<(), gst::ErrorMessage> { 302 *self.state.lock() = None; 303 Ok(()) 304 } 305 set_format( &self, element: &Self::Type, state: &gst_video::VideoCodecState<'static, gst_video::video_codec_state::Readable>, ) -> Result<(), gst::LoggableError>306 fn set_format( 307 &self, 308 element: &Self::Type, 309 state: &gst_video::VideoCodecState<'static, gst_video::video_codec_state::Readable>, 310 ) -> Result<(), gst::LoggableError> { 311 let video_info = state.info(); 312 gst_debug!(CAT, obj: element, "Setting format {:?}", video_info); 313 { 314 let settings = self.settings.lock(); 315 let mut state = State::new(video_info); 316 state.reset(*settings)?; 317 *self.state.lock() = Some(state); 318 } 319 320 let output_state = element 321 .set_output_state(gst::Caps::new_simple("image/png", &[]), Some(state)) 322 .map_err(|_| gst::loggable_error!(CAT, "Failed to set output state"))?; 323 element 324 .negotiate(output_state) 325 .map_err(|_| gst::loggable_error!(CAT, "Failed to negotiate")) 326 } 327 handle_frame( &self, element: &Self::Type, mut frame: gst_video::VideoCodecFrame, ) -> Result<gst::FlowSuccess, gst::FlowError>328 fn handle_frame( 329 &self, 330 element: &Self::Type, 331 mut frame: gst_video::VideoCodecFrame, 332 ) -> Result<gst::FlowSuccess, gst::FlowError> { 333 let mut state_guard = self.state.lock(); 334 let state = state_guard.as_mut().ok_or(gst::FlowError::NotNegotiated)?; 335 336 // FIXME: https://github.com/image-rs/image-png/issues/301 337 { 338 let settings = self.settings.lock(); 339 state.reset(*settings).map_err(|err| { 340 err.log_with_object(element); 341 gst::FlowError::Error 342 })?; 343 } 344 345 gst_debug!( 346 CAT, 347 obj: element, 348 "Sending frame {}", 349 frame.system_frame_number() 350 ); 351 { 352 let input_buffer = frame.input_buffer().expect("frame without input buffer"); 353 354 let input_map = input_buffer.map_readable().unwrap(); 355 let data = input_map.as_slice(); 356 state.write_data(data).map_err(|e| { 357 gst::element_error!(element, gst::CoreError::Failed, [&e.to_string()]); 358 gst::FlowError::Error 359 })?; 360 } 361 362 let buffer = state.cache.consume(); 363 drop(state_guard); 364 365 let output_buffer = gst::Buffer::from_mut_slice(buffer); 366 // There are no such incremental frames in the png format 367 frame.set_flags(gst_video::VideoCodecFrameFlags::SYNC_POINT); 368 frame.set_output_buffer(output_buffer); 369 element.finish_frame(Some(frame)) 370 } 371 } 372