1 // encrypter.rs 2 // 3 // Copyright 2019 Jordan Petridis <jordan@centricular.com> 4 // 5 // Permission is hereby granted, free of charge, to any person obtaining a copy 6 // of this software and associated documentation files (the "Software"), to 7 // deal in the Software without restriction, including without limitation the 8 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 9 // sell copies of the Software, and to permit persons to whom the Software is 10 // furnished to do so, subject to the following conditions: 11 // 12 // The above copyright notice and this permission notice shall be included in 13 // all copies or substantial portions of the Software. 14 // 15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 20 // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 21 // IN THE SOFTWARE. 22 // 23 // SPDX-License-Identifier: MIT 24 25 use gst::glib; 26 use gst::prelude::*; 27 use gst::subclass::prelude::*; 28 use gst::{gst_debug, gst_error, gst_log}; 29 use smallvec::SmallVec; 30 use sodiumoxide::crypto::box_; 31 32 type BufferVec = SmallVec<[gst::Buffer; 16]>; 33 34 use std::sync::Mutex; 35 36 use once_cell::sync::Lazy; 37 static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { 38 gst::DebugCategory::new( 39 "sodiumencrypter", 40 gst::DebugColorFlags::empty(), 41 Some("Encrypter Element"), 42 ) 43 }); 44 45 #[derive(Debug, Clone)] 46 struct Props { 47 receiver_key: Option<glib::Bytes>, 48 sender_key: Option<glib::Bytes>, 49 block_size: u32, 50 } 51 52 impl Default for Props { default() -> Self53 fn default() -> Self { 54 Props { 55 receiver_key: None, 56 sender_key: None, 57 block_size: 32768, 58 } 59 } 60 } 61 62 #[derive(Debug)] 63 struct State { 64 adapter: gst_base::UniqueAdapter, 65 nonce: box_::Nonce, 66 precomputed_key: box_::PrecomputedKey, 67 block_size: u32, 68 write_headers: bool, 69 } 70 71 impl State { from_props(props: &Props) -> Result<Self, gst::ErrorMessage>72 fn from_props(props: &Props) -> Result<Self, gst::ErrorMessage> { 73 let sender_key = props 74 .sender_key 75 .as_ref() 76 .and_then(|k| box_::SecretKey::from_slice(k)) 77 .ok_or_else(|| { 78 gst::error_msg!( 79 gst::ResourceError::NotFound, 80 [format!( 81 "Failed to set Sender's Key from property: {:?}", 82 props.sender_key 83 ) 84 .as_ref()] 85 ) 86 })?; 87 88 let receiver_key = props 89 .receiver_key 90 .as_ref() 91 .and_then(|k| box_::PublicKey::from_slice(k)) 92 .ok_or_else(|| { 93 gst::error_msg!( 94 gst::ResourceError::NotFound, 95 [format!( 96 "Failed to set Receiver's Key from property: {:?}", 97 props.receiver_key 98 ) 99 .as_ref()] 100 ) 101 })?; 102 103 // This env variable is only meant to bypass nonce regeneration during 104 // tests to get determinisic results. It should never be used outside 105 // of testing environments. 106 let nonce = if let Ok(val) = std::env::var("GST_SODIUM_ENCRYPT_NONCE") { 107 let bytes = hex::decode(val).expect("Failed to decode hex variable"); 108 assert_eq!(bytes.len(), box_::NONCEBYTES); 109 box_::Nonce::from_slice(&bytes).unwrap() 110 } else { 111 box_::gen_nonce() 112 }; 113 114 let precomputed_key = box_::precompute(&receiver_key, &sender_key); 115 116 Ok(Self { 117 adapter: gst_base::UniqueAdapter::new(), 118 precomputed_key, 119 nonce, 120 block_size: props.block_size, 121 write_headers: true, 122 }) 123 } 124 seal(&mut self, message: &[u8]) -> Vec<u8>125 fn seal(&mut self, message: &[u8]) -> Vec<u8> { 126 let ciphertext = box_::seal_precomputed(message, &self.nonce, &self.precomputed_key); 127 self.nonce.increment_le_inplace(); 128 ciphertext 129 } 130 encrypt_message(&mut self, buffer: &gst::BufferRef) -> gst::Buffer131 fn encrypt_message(&mut self, buffer: &gst::BufferRef) -> gst::Buffer { 132 let map = buffer 133 .map_readable() 134 .expect("Failed to map buffer readable"); 135 136 let sealed = self.seal(&map); 137 gst::Buffer::from_mut_slice(sealed) 138 } 139 encrypt_blocks(&mut self, block_size: usize) -> BufferVec140 fn encrypt_blocks(&mut self, block_size: usize) -> BufferVec { 141 assert_ne!(block_size, 0); 142 143 let mut buffers = BufferVec::new(); 144 145 // As long we have enough bytes to encrypt a block, or more, we do so 146 // else the leftover bytes on the adapter will be pushed when EOS 147 // is sent. 148 while self.adapter.available() >= block_size { 149 let buffer = self.adapter.take_buffer(block_size).unwrap(); 150 let out_buf = self.encrypt_message(&buffer); 151 152 buffers.push(out_buf); 153 } 154 155 buffers 156 } 157 } 158 159 pub struct Encrypter { 160 srcpad: gst::Pad, 161 sinkpad: gst::Pad, 162 props: Mutex<Props>, 163 state: Mutex<Option<State>>, 164 } 165 166 impl Encrypter { sink_chain( &self, pad: &gst::Pad, element: &super::Encrypter, buffer: gst::Buffer, ) -> Result<gst::FlowSuccess, gst::FlowError>167 fn sink_chain( 168 &self, 169 pad: &gst::Pad, 170 element: &super::Encrypter, 171 buffer: gst::Buffer, 172 ) -> Result<gst::FlowSuccess, gst::FlowError> { 173 gst_log!(CAT, obj: pad, "Handling buffer {:?}", buffer); 174 175 let mut buffers = BufferVec::new(); 176 let mut state_guard = self.state.lock().unwrap(); 177 let state = state_guard.as_mut().unwrap(); 178 179 if state.write_headers { 180 let mut headers = Vec::with_capacity(40); 181 headers.extend_from_slice(crate::TYPEFIND_HEADER); 182 // Write the Nonce used into the stream. 183 headers.extend_from_slice(state.nonce.as_ref()); 184 // Write the block_size into the stream 185 headers.extend_from_slice(&state.block_size.to_le_bytes()); 186 187 buffers.push(gst::Buffer::from_mut_slice(headers)); 188 state.write_headers = false; 189 } 190 191 state.adapter.push(buffer); 192 193 // Encrypt the whole blocks, if any, and push them. 194 buffers.extend(state.encrypt_blocks(state.block_size as usize)); 195 196 drop(state_guard); 197 198 for buffer in buffers { 199 self.srcpad.push(buffer).map_err(|err| { 200 gst_error!(CAT, obj: element, "Failed to push buffer {:?}", err); 201 err 202 })?; 203 } 204 205 Ok(gst::FlowSuccess::Ok) 206 } 207 sink_event(&self, pad: &gst::Pad, element: &super::Encrypter, event: gst::Event) -> bool208 fn sink_event(&self, pad: &gst::Pad, element: &super::Encrypter, event: gst::Event) -> bool { 209 use gst::EventView; 210 211 gst_log!(CAT, obj: pad, "Handling event {:?}", event); 212 213 match event.view() { 214 EventView::Caps(_) => { 215 // We send our own caps downstream 216 let caps = gst::Caps::builder("application/x-sodium-encrypted").build(); 217 self.srcpad.push_event(gst::event::Caps::new(&caps)) 218 } 219 EventView::Eos(_) => { 220 let mut state_mutex = self.state.lock().unwrap(); 221 let mut buffers = BufferVec::new(); 222 // This will only be run after READY state, 223 // and will be guaranted to be initialized 224 let state = state_mutex.as_mut().unwrap(); 225 226 // Now that all the full size blocks are pushed, drain the 227 // rest of the adapter and push whatever is left. 228 let avail = state.adapter.available(); 229 // logic error, all the complete blocks that can be pushed 230 // should have been done in the sink_chain call. 231 assert!(avail < state.block_size as usize); 232 233 if avail > 0 { 234 let b = state.encrypt_blocks(avail); 235 buffers.extend(b); 236 } 237 238 // drop the lock before pushing into the pad 239 drop(state_mutex); 240 241 for buffer in buffers { 242 if let Err(err) = self.srcpad.push(buffer) { 243 gst_error!(CAT, obj: element, "Failed to push buffer at EOS {:?}", err); 244 return false; 245 } 246 } 247 248 pad.event_default(Some(element), event) 249 } 250 _ => pad.event_default(Some(element), event), 251 } 252 } 253 src_event(&self, pad: &gst::Pad, element: &super::Encrypter, event: gst::Event) -> bool254 fn src_event(&self, pad: &gst::Pad, element: &super::Encrypter, event: gst::Event) -> bool { 255 use gst::EventView; 256 257 gst_log!(CAT, obj: pad, "Handling event {:?}", event); 258 259 match event.view() { 260 EventView::Seek(_) => false, 261 _ => pad.event_default(Some(element), event), 262 } 263 } 264 src_query( &self, pad: &gst::Pad, element: &super::Encrypter, query: &mut gst::QueryRef, ) -> bool265 fn src_query( 266 &self, 267 pad: &gst::Pad, 268 element: &super::Encrypter, 269 query: &mut gst::QueryRef, 270 ) -> bool { 271 use gst::QueryView; 272 273 gst_log!(CAT, obj: pad, "Handling query {:?}", query); 274 275 match query.view_mut() { 276 QueryView::Seeking(mut q) => { 277 let format = q.format(); 278 q.set( 279 false, 280 gst::GenericFormattedValue::Other(format, -1), 281 gst::GenericFormattedValue::Other(format, -1), 282 ); 283 gst_log!(CAT, obj: pad, "Returning {:?}", q.query_mut()); 284 true 285 } 286 QueryView::Duration(ref mut q) => { 287 use std::convert::TryInto; 288 289 if q.format() != gst::Format::Bytes { 290 return pad.query_default(Some(element), query); 291 } 292 293 /* First let's query the bytes duration upstream */ 294 let mut peer_query = gst::query::Duration::new(gst::Format::Bytes); 295 296 if !self.sinkpad.peer_query(&mut peer_query) { 297 gst_error!(CAT, "Failed to query upstream duration"); 298 return false; 299 } 300 301 let size = match peer_query.result().try_into().unwrap() { 302 Some(gst::format::Bytes(size)) => size, 303 None => { 304 gst_error!(CAT, "Failed to query upstream duration"); 305 306 return false; 307 } 308 }; 309 310 let state = self.state.lock().unwrap(); 311 let state = match state.as_ref() { 312 // If state isn't set, it means that the 313 // element hasn't been activated yet. 314 None => return false, 315 Some(s) => s, 316 }; 317 318 // calculate the number of chunks that exist in the stream 319 let total_chunks = (size + state.block_size as u64 - 1) / state.block_size as u64; 320 // add the MAC of each block 321 let size = size + total_chunks * box_::MACBYTES as u64; 322 323 // add static offsets 324 let size = size + crate::HEADERS_SIZE as u64; 325 326 gst_debug!(CAT, obj: pad, "Setting duration bytes: {}", size); 327 q.set(gst::format::Bytes(size)); 328 329 true 330 } 331 _ => pad.query_default(Some(element), query), 332 } 333 } 334 } 335 336 #[glib::object_subclass] 337 impl ObjectSubclass for Encrypter { 338 const NAME: &'static str = "RsSodiumEncrypter"; 339 type Type = super::Encrypter; 340 type ParentType = gst::Element; 341 with_class(klass: &Self::Class) -> Self342 fn with_class(klass: &Self::Class) -> Self { 343 let templ = klass.pad_template("sink").unwrap(); 344 let sinkpad = gst::Pad::builder_with_template(&templ, Some("sink")) 345 .chain_function(|pad, parent, buffer| { 346 Encrypter::catch_panic_pad_function( 347 parent, 348 || Err(gst::FlowError::Error), 349 |encrypter, element| encrypter.sink_chain(pad, element, buffer), 350 ) 351 }) 352 .event_function(|pad, parent, event| { 353 Encrypter::catch_panic_pad_function( 354 parent, 355 || false, 356 |encrypter, element| encrypter.sink_event(pad, element, event), 357 ) 358 }) 359 .build(); 360 361 let templ = klass.pad_template("src").unwrap(); 362 let srcpad = gst::Pad::builder_with_template(&templ, Some("src")) 363 .query_function(|pad, parent, query| { 364 Encrypter::catch_panic_pad_function( 365 parent, 366 || false, 367 |encrypter, element| encrypter.src_query(pad, element, query), 368 ) 369 }) 370 .event_function(|pad, parent, event| { 371 Encrypter::catch_panic_pad_function( 372 parent, 373 || false, 374 |encrypter, element| encrypter.src_event(pad, element, event), 375 ) 376 }) 377 .build(); 378 379 let props = Mutex::new(Props::default()); 380 let state = Mutex::new(None); 381 382 Self { 383 srcpad, 384 sinkpad, 385 props, 386 state, 387 } 388 } 389 } 390 391 impl ObjectImpl for Encrypter { properties() -> &'static [glib::ParamSpec]392 fn properties() -> &'static [glib::ParamSpec] { 393 static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| { 394 vec![ 395 glib::ParamSpec::new_boxed( 396 "receiver-key", 397 "Receiver Key", 398 "The public key of the Receiver", 399 glib::Bytes::static_type(), 400 glib::ParamFlags::READWRITE, 401 ), 402 glib::ParamSpec::new_boxed( 403 "sender-key", 404 "Sender Key", 405 "The private key of the Sender", 406 glib::Bytes::static_type(), 407 glib::ParamFlags::WRITABLE, 408 ), 409 glib::ParamSpec::new_uint( 410 "block-size", 411 "Block Size", 412 "The block-size of the chunks", 413 1024, 414 std::u32::MAX, 415 32768, 416 glib::ParamFlags::READWRITE, 417 ), 418 ] 419 }); 420 421 PROPERTIES.as_ref() 422 } 423 constructed(&self, obj: &Self::Type)424 fn constructed(&self, obj: &Self::Type) { 425 self.parent_constructed(obj); 426 427 obj.add_pad(&self.sinkpad).unwrap(); 428 obj.add_pad(&self.srcpad).unwrap(); 429 } 430 set_property( &self, _obj: &Self::Type, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec, )431 fn set_property( 432 &self, 433 _obj: &Self::Type, 434 _id: usize, 435 value: &glib::Value, 436 pspec: &glib::ParamSpec, 437 ) { 438 match pspec.name() { 439 "sender-key" => { 440 let mut props = self.props.lock().unwrap(); 441 props.sender_key = value.get().expect("type checked upstream"); 442 } 443 444 "receiver-key" => { 445 let mut props = self.props.lock().unwrap(); 446 props.receiver_key = value.get().expect("type checked upstream"); 447 } 448 449 "block-size" => { 450 let mut props = self.props.lock().unwrap(); 451 props.block_size = value.get().expect("type checked upstream"); 452 } 453 454 _ => unimplemented!(), 455 } 456 } 457 property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value458 fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { 459 match pspec.name() { 460 "receiver-key" => { 461 let props = self.props.lock().unwrap(); 462 props.receiver_key.to_value() 463 } 464 465 "block-size" => { 466 let props = self.props.lock().unwrap(); 467 props.block_size.to_value() 468 } 469 470 _ => unimplemented!(), 471 } 472 } 473 } 474 475 impl ElementImpl for Encrypter { metadata() -> Option<&'static gst::subclass::ElementMetadata>476 fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { 477 static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| { 478 gst::subclass::ElementMetadata::new( 479 "Encrypter", 480 "Generic", 481 "libsodium-based file encrypter", 482 "Jordan Petridis <jordan@centricular.com>", 483 ) 484 }); 485 486 Some(&*ELEMENT_METADATA) 487 } 488 pad_templates() -> &'static [gst::PadTemplate]489 fn pad_templates() -> &'static [gst::PadTemplate] { 490 static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| { 491 let src_caps = gst::Caps::builder("application/x-sodium-encrypted").build(); 492 let src_pad_template = gst::PadTemplate::new( 493 "src", 494 gst::PadDirection::Src, 495 gst::PadPresence::Always, 496 &src_caps, 497 ) 498 .unwrap(); 499 500 let sink_pad_template = gst::PadTemplate::new( 501 "sink", 502 gst::PadDirection::Sink, 503 gst::PadPresence::Always, 504 &gst::Caps::new_any(), 505 ) 506 .unwrap(); 507 508 vec![src_pad_template, sink_pad_template] 509 }); 510 511 PAD_TEMPLATES.as_ref() 512 } 513 change_state( &self, element: &Self::Type, transition: gst::StateChange, ) -> Result<gst::StateChangeSuccess, gst::StateChangeError>514 fn change_state( 515 &self, 516 element: &Self::Type, 517 transition: gst::StateChange, 518 ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> { 519 gst_debug!(CAT, obj: element, "Changing state {:?}", transition); 520 521 match transition { 522 gst::StateChange::NullToReady => { 523 let props = self.props.lock().unwrap().clone(); 524 525 // Create an internal state struct from the provided properties or 526 // refuse to change state 527 let state_ = State::from_props(&props).map_err(|err| { 528 element.post_error_message(err); 529 gst::StateChangeError 530 })?; 531 532 let mut state = self.state.lock().unwrap(); 533 *state = Some(state_); 534 } 535 gst::StateChange::ReadyToNull => { 536 let _ = self.state.lock().unwrap().take(); 537 } 538 _ => (), 539 } 540 541 let success = self.parent_change_state(element, transition)?; 542 543 if transition == gst::StateChange::ReadyToNull { 544 let _ = self.state.lock().unwrap().take(); 545 } 546 547 Ok(success) 548 } 549 } 550