1 // encrypter.rs
2 //
3 // Copyright 2019 Jordan Petridis <jordan@centricular.com>
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to
7 // deal in the Software without restriction, including without limitation the
8 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
9 // sell copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // The above copyright notice and this permission notice shall be included in
13 // all copies or substantial portions of the Software.
14 //
15 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
20 // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21 // IN THE SOFTWARE.
22 //
23 // SPDX-License-Identifier: MIT
24 
25 use gst::glib;
26 use gst::prelude::*;
27 use gst::subclass::prelude::*;
28 use gst::{gst_debug, gst_error, gst_log};
29 use smallvec::SmallVec;
30 use sodiumoxide::crypto::box_;
31 
32 type BufferVec = SmallVec<[gst::Buffer; 16]>;
33 
34 use std::sync::Mutex;
35 
36 use once_cell::sync::Lazy;
37 static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
38     gst::DebugCategory::new(
39         "sodiumencrypter",
40         gst::DebugColorFlags::empty(),
41         Some("Encrypter Element"),
42     )
43 });
44 
45 #[derive(Debug, Clone)]
46 struct Props {
47     receiver_key: Option<glib::Bytes>,
48     sender_key: Option<glib::Bytes>,
49     block_size: u32,
50 }
51 
52 impl Default for Props {
default() -> Self53     fn default() -> Self {
54         Props {
55             receiver_key: None,
56             sender_key: None,
57             block_size: 32768,
58         }
59     }
60 }
61 
62 #[derive(Debug)]
63 struct State {
64     adapter: gst_base::UniqueAdapter,
65     nonce: box_::Nonce,
66     precomputed_key: box_::PrecomputedKey,
67     block_size: u32,
68     write_headers: bool,
69 }
70 
71 impl State {
from_props(props: &Props) -> Result<Self, gst::ErrorMessage>72     fn from_props(props: &Props) -> Result<Self, gst::ErrorMessage> {
73         let sender_key = props
74             .sender_key
75             .as_ref()
76             .and_then(|k| box_::SecretKey::from_slice(k))
77             .ok_or_else(|| {
78                 gst::error_msg!(
79                     gst::ResourceError::NotFound,
80                     [format!(
81                         "Failed to set Sender's Key from property: {:?}",
82                         props.sender_key
83                     )
84                     .as_ref()]
85                 )
86             })?;
87 
88         let receiver_key = props
89             .receiver_key
90             .as_ref()
91             .and_then(|k| box_::PublicKey::from_slice(k))
92             .ok_or_else(|| {
93                 gst::error_msg!(
94                     gst::ResourceError::NotFound,
95                     [format!(
96                         "Failed to set Receiver's Key from property: {:?}",
97                         props.receiver_key
98                     )
99                     .as_ref()]
100                 )
101             })?;
102 
103         // This env variable is only meant to bypass nonce regeneration during
104         // tests to get determinisic results. It should never be used outside
105         // of testing environments.
106         let nonce = if let Ok(val) = std::env::var("GST_SODIUM_ENCRYPT_NONCE") {
107             let bytes = hex::decode(val).expect("Failed to decode hex variable");
108             assert_eq!(bytes.len(), box_::NONCEBYTES);
109             box_::Nonce::from_slice(&bytes).unwrap()
110         } else {
111             box_::gen_nonce()
112         };
113 
114         let precomputed_key = box_::precompute(&receiver_key, &sender_key);
115 
116         Ok(Self {
117             adapter: gst_base::UniqueAdapter::new(),
118             precomputed_key,
119             nonce,
120             block_size: props.block_size,
121             write_headers: true,
122         })
123     }
124 
seal(&mut self, message: &[u8]) -> Vec<u8>125     fn seal(&mut self, message: &[u8]) -> Vec<u8> {
126         let ciphertext = box_::seal_precomputed(message, &self.nonce, &self.precomputed_key);
127         self.nonce.increment_le_inplace();
128         ciphertext
129     }
130 
encrypt_message(&mut self, buffer: &gst::BufferRef) -> gst::Buffer131     fn encrypt_message(&mut self, buffer: &gst::BufferRef) -> gst::Buffer {
132         let map = buffer
133             .map_readable()
134             .expect("Failed to map buffer readable");
135 
136         let sealed = self.seal(&map);
137         gst::Buffer::from_mut_slice(sealed)
138     }
139 
encrypt_blocks(&mut self, block_size: usize) -> BufferVec140     fn encrypt_blocks(&mut self, block_size: usize) -> BufferVec {
141         assert_ne!(block_size, 0);
142 
143         let mut buffers = BufferVec::new();
144 
145         // As long we have enough bytes to encrypt a block, or more, we do so
146         // else the leftover bytes on the adapter will be pushed when EOS
147         // is sent.
148         while self.adapter.available() >= block_size {
149             let buffer = self.adapter.take_buffer(block_size).unwrap();
150             let out_buf = self.encrypt_message(&buffer);
151 
152             buffers.push(out_buf);
153         }
154 
155         buffers
156     }
157 }
158 
159 pub struct Encrypter {
160     srcpad: gst::Pad,
161     sinkpad: gst::Pad,
162     props: Mutex<Props>,
163     state: Mutex<Option<State>>,
164 }
165 
166 impl Encrypter {
sink_chain( &self, pad: &gst::Pad, element: &super::Encrypter, buffer: gst::Buffer, ) -> Result<gst::FlowSuccess, gst::FlowError>167     fn sink_chain(
168         &self,
169         pad: &gst::Pad,
170         element: &super::Encrypter,
171         buffer: gst::Buffer,
172     ) -> Result<gst::FlowSuccess, gst::FlowError> {
173         gst_log!(CAT, obj: pad, "Handling buffer {:?}", buffer);
174 
175         let mut buffers = BufferVec::new();
176         let mut state_guard = self.state.lock().unwrap();
177         let state = state_guard.as_mut().unwrap();
178 
179         if state.write_headers {
180             let mut headers = Vec::with_capacity(40);
181             headers.extend_from_slice(crate::TYPEFIND_HEADER);
182             // Write the Nonce used into the stream.
183             headers.extend_from_slice(state.nonce.as_ref());
184             // Write the block_size into the stream
185             headers.extend_from_slice(&state.block_size.to_le_bytes());
186 
187             buffers.push(gst::Buffer::from_mut_slice(headers));
188             state.write_headers = false;
189         }
190 
191         state.adapter.push(buffer);
192 
193         // Encrypt the whole blocks, if any, and push them.
194         buffers.extend(state.encrypt_blocks(state.block_size as usize));
195 
196         drop(state_guard);
197 
198         for buffer in buffers {
199             self.srcpad.push(buffer).map_err(|err| {
200                 gst_error!(CAT, obj: element, "Failed to push buffer {:?}", err);
201                 err
202             })?;
203         }
204 
205         Ok(gst::FlowSuccess::Ok)
206     }
207 
sink_event(&self, pad: &gst::Pad, element: &super::Encrypter, event: gst::Event) -> bool208     fn sink_event(&self, pad: &gst::Pad, element: &super::Encrypter, event: gst::Event) -> bool {
209         use gst::EventView;
210 
211         gst_log!(CAT, obj: pad, "Handling event {:?}", event);
212 
213         match event.view() {
214             EventView::Caps(_) => {
215                 // We send our own caps downstream
216                 let caps = gst::Caps::builder("application/x-sodium-encrypted").build();
217                 self.srcpad.push_event(gst::event::Caps::new(&caps))
218             }
219             EventView::Eos(_) => {
220                 let mut state_mutex = self.state.lock().unwrap();
221                 let mut buffers = BufferVec::new();
222                 // This will only be run after READY state,
223                 // and will be guaranted to be initialized
224                 let state = state_mutex.as_mut().unwrap();
225 
226                 // Now that all the full size blocks are pushed, drain the
227                 // rest of the adapter and push whatever is left.
228                 let avail = state.adapter.available();
229                 // logic error, all the complete blocks that can be pushed
230                 // should have been done in the sink_chain call.
231                 assert!(avail < state.block_size as usize);
232 
233                 if avail > 0 {
234                     let b = state.encrypt_blocks(avail);
235                     buffers.extend(b);
236                 }
237 
238                 // drop the lock before pushing into the pad
239                 drop(state_mutex);
240 
241                 for buffer in buffers {
242                     if let Err(err) = self.srcpad.push(buffer) {
243                         gst_error!(CAT, obj: element, "Failed to push buffer at EOS {:?}", err);
244                         return false;
245                     }
246                 }
247 
248                 pad.event_default(Some(element), event)
249             }
250             _ => pad.event_default(Some(element), event),
251         }
252     }
253 
src_event(&self, pad: &gst::Pad, element: &super::Encrypter, event: gst::Event) -> bool254     fn src_event(&self, pad: &gst::Pad, element: &super::Encrypter, event: gst::Event) -> bool {
255         use gst::EventView;
256 
257         gst_log!(CAT, obj: pad, "Handling event {:?}", event);
258 
259         match event.view() {
260             EventView::Seek(_) => false,
261             _ => pad.event_default(Some(element), event),
262         }
263     }
264 
src_query( &self, pad: &gst::Pad, element: &super::Encrypter, query: &mut gst::QueryRef, ) -> bool265     fn src_query(
266         &self,
267         pad: &gst::Pad,
268         element: &super::Encrypter,
269         query: &mut gst::QueryRef,
270     ) -> bool {
271         use gst::QueryView;
272 
273         gst_log!(CAT, obj: pad, "Handling query {:?}", query);
274 
275         match query.view_mut() {
276             QueryView::Seeking(mut q) => {
277                 let format = q.format();
278                 q.set(
279                     false,
280                     gst::GenericFormattedValue::Other(format, -1),
281                     gst::GenericFormattedValue::Other(format, -1),
282                 );
283                 gst_log!(CAT, obj: pad, "Returning {:?}", q.query_mut());
284                 true
285             }
286             QueryView::Duration(ref mut q) => {
287                 use std::convert::TryInto;
288 
289                 if q.format() != gst::Format::Bytes {
290                     return pad.query_default(Some(element), query);
291                 }
292 
293                 /* First let's query the bytes duration upstream */
294                 let mut peer_query = gst::query::Duration::new(gst::Format::Bytes);
295 
296                 if !self.sinkpad.peer_query(&mut peer_query) {
297                     gst_error!(CAT, "Failed to query upstream duration");
298                     return false;
299                 }
300 
301                 let size = match peer_query.result().try_into().unwrap() {
302                     Some(gst::format::Bytes(size)) => size,
303                     None => {
304                         gst_error!(CAT, "Failed to query upstream duration");
305 
306                         return false;
307                     }
308                 };
309 
310                 let state = self.state.lock().unwrap();
311                 let state = match state.as_ref() {
312                     // If state isn't set, it means that the
313                     // element hasn't been activated yet.
314                     None => return false,
315                     Some(s) => s,
316                 };
317 
318                 // calculate the number of chunks that exist in the stream
319                 let total_chunks = (size + state.block_size as u64 - 1) / state.block_size as u64;
320                 // add the MAC of each block
321                 let size = size + total_chunks * box_::MACBYTES as u64;
322 
323                 // add static offsets
324                 let size = size + crate::HEADERS_SIZE as u64;
325 
326                 gst_debug!(CAT, obj: pad, "Setting duration bytes: {}", size);
327                 q.set(gst::format::Bytes(size));
328 
329                 true
330             }
331             _ => pad.query_default(Some(element), query),
332         }
333     }
334 }
335 
336 #[glib::object_subclass]
337 impl ObjectSubclass for Encrypter {
338     const NAME: &'static str = "RsSodiumEncrypter";
339     type Type = super::Encrypter;
340     type ParentType = gst::Element;
341 
with_class(klass: &Self::Class) -> Self342     fn with_class(klass: &Self::Class) -> Self {
343         let templ = klass.pad_template("sink").unwrap();
344         let sinkpad = gst::Pad::builder_with_template(&templ, Some("sink"))
345             .chain_function(|pad, parent, buffer| {
346                 Encrypter::catch_panic_pad_function(
347                     parent,
348                     || Err(gst::FlowError::Error),
349                     |encrypter, element| encrypter.sink_chain(pad, element, buffer),
350                 )
351             })
352             .event_function(|pad, parent, event| {
353                 Encrypter::catch_panic_pad_function(
354                     parent,
355                     || false,
356                     |encrypter, element| encrypter.sink_event(pad, element, event),
357                 )
358             })
359             .build();
360 
361         let templ = klass.pad_template("src").unwrap();
362         let srcpad = gst::Pad::builder_with_template(&templ, Some("src"))
363             .query_function(|pad, parent, query| {
364                 Encrypter::catch_panic_pad_function(
365                     parent,
366                     || false,
367                     |encrypter, element| encrypter.src_query(pad, element, query),
368                 )
369             })
370             .event_function(|pad, parent, event| {
371                 Encrypter::catch_panic_pad_function(
372                     parent,
373                     || false,
374                     |encrypter, element| encrypter.src_event(pad, element, event),
375                 )
376             })
377             .build();
378 
379         let props = Mutex::new(Props::default());
380         let state = Mutex::new(None);
381 
382         Self {
383             srcpad,
384             sinkpad,
385             props,
386             state,
387         }
388     }
389 }
390 
391 impl ObjectImpl for Encrypter {
properties() -> &'static [glib::ParamSpec]392     fn properties() -> &'static [glib::ParamSpec] {
393         static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
394             vec![
395                 glib::ParamSpec::new_boxed(
396                     "receiver-key",
397                     "Receiver Key",
398                     "The public key of the Receiver",
399                     glib::Bytes::static_type(),
400                     glib::ParamFlags::READWRITE,
401                 ),
402                 glib::ParamSpec::new_boxed(
403                     "sender-key",
404                     "Sender Key",
405                     "The private key of the Sender",
406                     glib::Bytes::static_type(),
407                     glib::ParamFlags::WRITABLE,
408                 ),
409                 glib::ParamSpec::new_uint(
410                     "block-size",
411                     "Block Size",
412                     "The block-size of the chunks",
413                     1024,
414                     std::u32::MAX,
415                     32768,
416                     glib::ParamFlags::READWRITE,
417                 ),
418             ]
419         });
420 
421         PROPERTIES.as_ref()
422     }
423 
constructed(&self, obj: &Self::Type)424     fn constructed(&self, obj: &Self::Type) {
425         self.parent_constructed(obj);
426 
427         obj.add_pad(&self.sinkpad).unwrap();
428         obj.add_pad(&self.srcpad).unwrap();
429     }
430 
set_property( &self, _obj: &Self::Type, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec, )431     fn set_property(
432         &self,
433         _obj: &Self::Type,
434         _id: usize,
435         value: &glib::Value,
436         pspec: &glib::ParamSpec,
437     ) {
438         match pspec.name() {
439             "sender-key" => {
440                 let mut props = self.props.lock().unwrap();
441                 props.sender_key = value.get().expect("type checked upstream");
442             }
443 
444             "receiver-key" => {
445                 let mut props = self.props.lock().unwrap();
446                 props.receiver_key = value.get().expect("type checked upstream");
447             }
448 
449             "block-size" => {
450                 let mut props = self.props.lock().unwrap();
451                 props.block_size = value.get().expect("type checked upstream");
452             }
453 
454             _ => unimplemented!(),
455         }
456     }
457 
property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value458     fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
459         match pspec.name() {
460             "receiver-key" => {
461                 let props = self.props.lock().unwrap();
462                 props.receiver_key.to_value()
463             }
464 
465             "block-size" => {
466                 let props = self.props.lock().unwrap();
467                 props.block_size.to_value()
468             }
469 
470             _ => unimplemented!(),
471         }
472     }
473 }
474 
475 impl ElementImpl for Encrypter {
metadata() -> Option<&'static gst::subclass::ElementMetadata>476     fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
477         static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
478             gst::subclass::ElementMetadata::new(
479                 "Encrypter",
480                 "Generic",
481                 "libsodium-based file encrypter",
482                 "Jordan Petridis <jordan@centricular.com>",
483             )
484         });
485 
486         Some(&*ELEMENT_METADATA)
487     }
488 
pad_templates() -> &'static [gst::PadTemplate]489     fn pad_templates() -> &'static [gst::PadTemplate] {
490         static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
491             let src_caps = gst::Caps::builder("application/x-sodium-encrypted").build();
492             let src_pad_template = gst::PadTemplate::new(
493                 "src",
494                 gst::PadDirection::Src,
495                 gst::PadPresence::Always,
496                 &src_caps,
497             )
498             .unwrap();
499 
500             let sink_pad_template = gst::PadTemplate::new(
501                 "sink",
502                 gst::PadDirection::Sink,
503                 gst::PadPresence::Always,
504                 &gst::Caps::new_any(),
505             )
506             .unwrap();
507 
508             vec![src_pad_template, sink_pad_template]
509         });
510 
511         PAD_TEMPLATES.as_ref()
512     }
513 
change_state( &self, element: &Self::Type, transition: gst::StateChange, ) -> Result<gst::StateChangeSuccess, gst::StateChangeError>514     fn change_state(
515         &self,
516         element: &Self::Type,
517         transition: gst::StateChange,
518     ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
519         gst_debug!(CAT, obj: element, "Changing state {:?}", transition);
520 
521         match transition {
522             gst::StateChange::NullToReady => {
523                 let props = self.props.lock().unwrap().clone();
524 
525                 // Create an internal state struct from the provided properties or
526                 // refuse to change state
527                 let state_ = State::from_props(&props).map_err(|err| {
528                     element.post_error_message(err);
529                     gst::StateChangeError
530                 })?;
531 
532                 let mut state = self.state.lock().unwrap();
533                 *state = Some(state_);
534             }
535             gst::StateChange::ReadyToNull => {
536                 let _ = self.state.lock().unwrap().take();
537             }
538             _ => (),
539         }
540 
541         let success = self.parent_change_state(element, transition)?;
542 
543         if transition == gst::StateChange::ReadyToNull {
544             let _ = self.state.lock().unwrap().take();
545         }
546 
547         Ok(success)
548     }
549 }
550