1 //! Low-level IPC mechanism for Sequoia.
2 //!
3 //! # Rationale
4 //!
5 //! Sequoia makes use of background services e.g. for managing and
6 //! updating public keys.
7 //!
8 //! # Design
9 //!
10 //! We use the filesystem as namespace to discover services.  Every
11 //! service has a file called rendezvous point.  Access to this file
12 //! is serialized using file locking.  This file contains a socket
13 //! address and a cookie that we use to connect to the server and
14 //! authenticate us.  If the file does not exist, is malformed, or
15 //! does not point to a usable server, we start a new one on demand.
16 //!
17 //! This design mimics Unix sockets, but works on Windows too.
18 //!
19 //! # External vs internal servers
20 //!
21 //! These servers can be either in external processes, or co-located
22 //! within the current process.  We will first start an external
23 //! process, and fall back to starting a thread instead.
24 //!
25 //! Using an external process is the preferred option.  It allows us
26 //! to continuously update the keys in the keystore, for example.  It
27 //! also means that we do not spawn a thread in your process, which is
28 //! frowned upon for various reasons.
29 //!
30 //! Please see [IPCPolicy] for more information.
31 //!
32 //! [IPCPolicy]: ../../sequoia_core/enum.IPCPolicy.html
33 //!
34 //! # Note
35 //!
36 //! Windows support is currently not implemented, but should be
37 //! straight forward.
38 
39 use std::fs;
40 use std::io::{self, Read, Write};
41 use std::net::{Ipv4Addr, SocketAddr, TcpStream, TcpListener};
42 use std::path::PathBuf;
43 
44 use anyhow::{anyhow, Result};
45 use fs2::FileExt;
46 use futures::{Future, Stream};
47 
48 use tokio_core::net;
49 use tokio_io::io::ReadHalf;
50 use tokio_io::AsyncRead;
51 
52 use capnp_rpc::{RpcSystem, twoparty};
53 use capnp_rpc::rpc_twoparty_capnp::Side;
54 
55 #[cfg(unix)]
56 use std::os::unix::{io::{IntoRawFd, FromRawFd}, fs::OpenOptionsExt};
57 #[cfg(windows)]
58 use std::os::windows::io::{AsRawSocket, IntoRawSocket, FromRawSocket};
59 #[cfg(windows)]
60 use winapi::um::winsock2;
61 
62 use std::process::{Command, Stdio};
63 use std::thread;
64 
65 use sequoia_openpgp as openpgp;
66 use sequoia_core as core;
67 
68 // Turns an `if let` into an expression so that it is possible to do
69 // things like:
70 //
71 // ```rust,nocompile
72 // if destructures_to(Foo::Bar(_) = value)
73 //    || destructures_to(Foo::Bam(_) = value) { ... }
74 // ```
75 // TODO: Replace with `std::matches!` once MSRV is bumped to 1.42.
76 #[cfg(test)]
77 macro_rules! destructures_to {
78     ( $error: pat = $expr:expr ) => {
79         {
80             let x = $expr;
81             if let $error = x {
82                 true
83             } else {
84                 false
85             }
86         }
87     };
88 }
89 
90 #[macro_use] mod trace;
91 pub mod assuan;
92 pub mod gnupg;
93 mod keygrip;
94 pub use self::keygrip::Keygrip;
95 pub mod sexp;
96 
97 #[cfg(test)]
98 mod tests;
99 
100 macro_rules! platform {
101     { unix => { $($unix:tt)* }, windows => { $($windows:tt)* } } => {
102         if cfg!(unix) {
103             #[cfg(unix)] { $($unix)* }
104             #[cfg(not(unix))] { unreachable!() }
105         } else if cfg!(windows) {
106             #[cfg(windows)] { $($windows)* }
107             #[cfg(not(windows))] { unreachable!() }
108         } else {
109             #[cfg(not(any(unix, windows)))] compile_error!("Unsupported platform");
110             unreachable!()
111         }
112     }
113 }
114 
115 /// Servers need to implement this trait.
116 pub trait Handler {
117     /// Called on every connection.
handle(&self, network: twoparty::VatNetwork<ReadHalf<net::TcpStream>>) -> RpcSystem<Side>118     fn handle(&self,
119               network: twoparty::VatNetwork<ReadHalf<net::TcpStream>>)
120               -> RpcSystem<Side>;
121 }
122 
123 /// A factory for handlers.
124 pub type HandlerFactory = fn(descriptor: Descriptor,
125                              handle: tokio_core::reactor::Handle)
126                              -> Result<Box<dyn Handler>>;
127 
128 /// A descriptor is used to connect to a service.
129 #[derive(Clone)]
130 pub struct Descriptor {
131     ctx: core::Context,
132     rendezvous: PathBuf,
133     executable: PathBuf,
134     factory: HandlerFactory,
135 }
136 
137 impl Descriptor {
138     /// Create a descriptor given its rendezvous point, the path to
139     /// the servers executable file, and a handler factory.
new(ctx: &core::Context, rendezvous: PathBuf, executable: PathBuf, factory: HandlerFactory) -> Self140     pub fn new(ctx: &core::Context, rendezvous: PathBuf,
141                executable: PathBuf, factory: HandlerFactory)
142                -> Self {
143         Descriptor {
144             ctx: ctx.clone(),
145             rendezvous,
146             executable,
147             factory,
148         }
149     }
150 
151     /// Returns the context.
context(&self) -> &core::Context152     pub fn context(&self) -> &core::Context {
153         &self.ctx
154     }
155 
156     /// Connects to a descriptor, starting the server if necessary.
connect(&self, handle: &tokio_core::reactor::Handle) -> Result<RpcSystem<Side>>157     pub fn connect(&self, handle: &tokio_core::reactor::Handle)
158                    -> Result<RpcSystem<Side>> {
159         self.connect_with_policy(handle, *self.ctx.ipc_policy())
160     }
161 
162     /// Connects to a descriptor, starting the server if necessary.
163     ///
164     /// This function does not use the context's IPC policy, but uses
165     /// the given one.
connect_with_policy(&self, handle: &tokio_core::reactor::Handle, policy: core::IPCPolicy) -> Result<RpcSystem<Side>>166     pub fn connect_with_policy(&self, handle: &tokio_core::reactor::Handle,
167                                policy: core::IPCPolicy)
168                    -> Result<RpcSystem<Side>> {
169         let do_connect = |cookie: Cookie, mut s: TcpStream| {
170             cookie.send(&mut s)?;
171 
172             /* Tokioize.  */
173             let stream = net::TcpStream::from_stream(s, &handle)?;
174             stream.set_nodelay(true)?;
175             let (reader, writer) = stream.split();
176 
177             let network =
178                 Box::new(twoparty::VatNetwork::new(reader, writer,
179                                                    Side::Client,
180                                                    Default::default()));
181 
182             Ok(RpcSystem::new(network, None))
183         };
184 
185         fs::create_dir_all(self.ctx.home())?;
186         let mut file = fs::OpenOptions::new();
187         file
188             .read(true)
189             .write(true)
190             .create(true);
191         #[cfg(unix)]
192         file.mode(0o600);
193         let mut file = file.open(&self.rendezvous)?;
194         file.lock_exclusive()?;
195 
196         let mut c = vec![];
197         file.read_to_end(&mut c)?;
198 
199         if let Some((cookie, rest)) = Cookie::extract(c) {
200             let stream = String::from_utf8(rest).map_err(drop)
201                 .and_then(|rest| rest.parse::<SocketAddr>().map_err(drop))
202                 .and_then(|addr| TcpStream::connect(addr).map_err(drop));
203 
204             if let Ok(s) = stream {
205                 do_connect(cookie, s)
206             } else {
207                 /* Failed to connect.  Invalidate the cookie and try again.  */
208                 file.set_len(0)?;
209                 drop(file);
210                 self.connect(handle)
211             }
212         } else {
213             let cookie = Cookie::new();
214 
215             let (addr, external) = match policy {
216                 core::IPCPolicy::Internal => self.start(false)?,
217                 core::IPCPolicy::External => self.start(true)?,
218                 core::IPCPolicy::Robust => self.start(true)
219                     .or_else(|_| self.start(false))?
220             };
221 
222             /* XXX: It'd be nice not to waste this connection.  */
223             cookie.send(&mut TcpStream::connect(addr)?)?;
224 
225             if external {
226                 /* Write connection information to file.  */
227                 file.set_len(0)?;
228                 file.write_all(&cookie.0)?;
229                 write!(file, "{}", addr)?;
230             }
231             drop(file);
232 
233             do_connect(cookie, TcpStream::connect(addr)?)
234         }
235     }
236 
237     /// Start the service, either as an external process or as a
238     /// thread.
start(&self, external: bool) -> Result<(SocketAddr, bool)>239     fn start(&self, external: bool) -> Result<(SocketAddr, bool)> {
240         let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).unwrap();
241         let addr = listener.local_addr()?;
242 
243         /* Start the server, connect to it, and send the cookie.  */
244         if external {
245             self.fork(listener)?;
246         } else {
247             self.spawn(listener)?;
248         }
249 
250         Ok((addr, external))
251     }
252 
fork(&self, listener: TcpListener) -> Result<()>253     fn fork(&self, listener: TcpListener) -> Result<()> {
254         let mut cmd = Command::new(&self.executable);
255         cmd
256             .arg("--home")
257             .arg(self.ctx.home())
258             .arg("--lib")
259             .arg(self.ctx.lib())
260             .arg("--ephemeral")
261             .arg(self.ctx.ephemeral().to_string())
262             .stdout(Stdio::null())
263             .stderr(Stdio::null());
264 
265         platform! {
266             unix => {
267                 // Pass the listening TCP socket as child stdin.
268                 cmd.stdin(unsafe { Stdio::from_raw_fd(listener.into_raw_fd()) });
269             },
270             windows => {
271                 // Sockets for `TcpListener` are not inheritable by default, so
272                 // let's make them so, since we'll pass them to a child process.
273                 unsafe {
274                     match winapi::um::handleapi::SetHandleInformation(
275                         listener.as_raw_socket() as _,
276                         winapi::um::winbase::HANDLE_FLAG_INHERIT,
277                         winapi::um::winbase::HANDLE_FLAG_INHERIT,
278                     ) {
279                         0 => Err(std::io::Error::last_os_error()),
280                         _ => Ok(())
281                     }?
282                 };
283                 // We can't pass the socket to stdin directly on Windows, since
284                 // non-overlapped (blocking) I/O handles can be redirected there.
285                 // We use Tokio (async I/O), so we just pass it via env var rather
286                 // than establishing a separate channel to pass the socket through.
287                 cmd.env("SOCKET", format!("{}", listener.into_raw_socket()));
288             }
289         }
290 
291         cmd.spawn()?;
292         Ok(())
293     }
294 
spawn(&self, l: TcpListener) -> Result<()>295     fn spawn(&self, l: TcpListener) -> Result<()> {
296         let descriptor = self.clone();
297         thread::spawn(move || -> Result<()> {
298             Ok(Server::new(descriptor)
299                .expect("Failed to spawn server") // XXX
300                .serve_listener(l)
301                .expect("Failed to spawn server")) // XXX
302         });
303         Ok(())
304     }
305 }
306 
307 /// A server.
308 pub struct Server {
309     core: tokio_core::reactor::Core,
310     descriptor: Descriptor,
311 }
312 
313 impl Server {
314     /// Creates a new server for the descriptor.
new(descriptor: Descriptor) -> Result<Self>315     pub fn new(descriptor: Descriptor) -> Result<Self> {
316         Ok(Server {
317             core: tokio_core::reactor::Core::new()?,
318             descriptor,
319         })
320     }
321 
322     /// Creates a Context from `env::args()`.
context() -> Result<core::Context>323     pub fn context() -> Result<core::Context> {
324         use std::env::args;
325         let args: Vec<String> = args().collect();
326 
327         if args.len() != 7 || args[1] != "--home"
328             || args[3] != "--lib" || args[5] != "--ephemeral" {
329                 return Err(anyhow!(
330                     "Usage: {} --home <HOMEDIR> --lib <LIBDIR> \
331                      --ephemeral true|false", args[0]));
332             }
333 
334         let mut cfg = core::Context::configure()
335             .home(&args[2]).lib(&args[4]);
336 
337         if let Ok(ephemeral) = args[6].parse() {
338             if ephemeral {
339                 cfg.set_ephemeral();
340             }
341         } else {
342             return Err(anyhow!(
343                 "Expected 'true' or 'false' for --ephemeral, got: {}",
344                 args[6]));
345         }
346 
347         cfg.build()
348     }
349 
350     /// Turns this process into a server.
351     ///
352     /// External servers must call this early on.
353     ///
354     /// On Linux expects 'stdin' to be a listening TCP socket.
355     /// On Windows this expects `SOCKET` env var to be set to a listening socket
356     /// of the Windows Sockets API `SOCKET` value.
357     ///
358     /// # Example
359     ///
360     /// ```compile_fail
361     /// // We cannot run this because sequoia-store is not built yet.
362     /// extern crate sequoia_core;
363     /// extern crate sequoia_net;
364     /// extern crate sequoia_store;
365     ///
366     /// use sequoia_ipc::Server;
367     ///
368     /// fn main() {
369     ///     let ctx = Server::context()
370     ///         .expect("Failed to create context");
371     ///     Server::new(sequoia_store::descriptor(&ctx))
372     ///         .expect("Failed to create server")
373     ///         .serve()
374     ///         .expect("Failed to start server");
375     /// }
376     /// ```
serve(&mut self) -> Result<()>377     pub fn serve(&mut self) -> Result<()> {
378         let listener = platform! {
379             unix => { unsafe { TcpListener::from_raw_fd(0) } },
380             windows => {
381                 let socket = std::env::var("SOCKET")?.parse()?;
382                 unsafe { TcpListener::from_raw_socket(socket) }
383             }
384         };
385         self.serve_listener(listener)
386     }
387 
serve_listener(&mut self, l: TcpListener) -> Result<()>388     fn serve_listener(&mut self, l: TcpListener) -> Result<()> {
389         /* The first client tells us our cookie.  */
390         let cookie = {
391             /* XXX: It'd be nice to recycle this connection.  */
392             let mut i = l.accept()?;
393             Cookie::receive(&mut i.0)?
394         };
395 
396         let handler = (self.descriptor.factory)(self.descriptor.clone(), self.core.handle())?;
397 
398         /* Tokioize.  */
399         let handle = self.core.handle();
400         let a = l.local_addr()?;
401         let socket = tokio_core::net::TcpListener::from_listener(l, &a, &handle).unwrap();
402 
403         let done = socket.incoming().and_then(|(socket, _addr)| {
404             let _ = socket.set_nodelay(true);
405             Cookie::receive_async(socket)
406         }).and_then(|(socket, received_cookie)| {
407             if received_cookie == cookie {
408                 Ok(socket)
409             } else {
410                 Err(io::Error::new(io::ErrorKind::BrokenPipe, "Bad cookie."))
411             }
412         }).for_each(|socket| {
413             let (reader, writer) = socket.split();
414 
415             let network =
416                 twoparty::VatNetwork::new(reader, writer,
417                                           Side::Server, Default::default());
418 
419             let rpc_system = handler.handle(network);
420             handle.spawn(rpc_system.map_err(|e| println!("error: {:?}", e)));
421             Ok(())
422         });
423 
424         Ok(self.core.run(done)?)
425     }
426 }
427 
428 /// Cookies are used to authenticate clients.
429 struct Cookie(Vec<u8>);
430 
431 use rand::RngCore;
432 use rand::rngs::OsRng;
433 
434 impl Cookie {
435     const SIZE: usize = 32;
436 
437     /// Make a new cookie.
new() -> Self438     fn new() -> Self {
439         let mut c = vec![0; Cookie::SIZE];
440         OsRng.fill_bytes(&mut c);
441         Cookie(c)
442     }
443 
444     /// Make a new cookie from a slice.
from(buf: &[u8]) -> Option<Self>445     fn from(buf: &[u8]) -> Option<Self> {
446         if buf.len() == Cookie::SIZE {
447             let mut c = Vec::with_capacity(Cookie::SIZE);
448             c.extend_from_slice(buf);
449             Some(Cookie(c))
450         } else {
451             None
452         }
453     }
454 
455     /// Given a vector starting with a cookie, extract it and return
456     /// the rest.
extract(mut buf: Vec<u8>) -> Option<(Self, Vec<u8>)>457     fn extract(mut buf: Vec<u8>) -> Option<(Self, Vec<u8>)> {
458         if buf.len() >= Cookie::SIZE {
459             let r = buf.split_off(Cookie::SIZE);
460             Some((Cookie(buf), r))
461         } else {
462             None
463         }
464     }
465 
466     /// Read a cookie from 'from'.
receive<R: Read>(from: &mut R) -> Result<Self>467     fn receive<R: Read>(from: &mut R) -> Result<Self> {
468         let mut buf = vec![0; Cookie::SIZE];
469         from.read_exact(&mut buf)?;
470         Ok(Cookie(buf))
471     }
472 
473     /// Asynchronously read a cookie from 'socket'.
receive_async(socket: net::TcpStream) -> impl Future<Item = (net::TcpStream, Cookie), Error = io::Error>474     fn receive_async(socket: net::TcpStream)
475         -> impl Future<Item = (net::TcpStream, Cookie), Error = io::Error> {
476         let buf = vec![0; Cookie::SIZE];
477         tokio_io::io::read_exact(socket, buf)
478             .and_then(|(socket, buf)| {
479                 Ok((socket, Cookie::from(&buf).expect("enough bytes read")))
480             })
481     }
482 
483 
484     /// Write a cookie to 'to'.
send<W: Write>(&self, to: &mut W) -> io::Result<()>485     fn send<W: Write>(&self, to: &mut W) -> io::Result<()> {
486         to.write_all(&self.0)
487     }
488 }
489 
490 impl PartialEq for Cookie {
eq(&self, other: &Cookie) -> bool491     fn eq(&self, other: &Cookie) -> bool {
492         // First, compare the length.
493         self.0.len() == other.0.len()
494             // The length is not a secret, hence we can use && here.
495             && unsafe {
496                 ::memsec::memeq(self.0.as_ptr(),
497                                 other.0.as_ptr(),
498                                 self.0.len())
499             }
500     }
501 }
502 
503 #[derive(thiserror::Error, Debug)]
504 /// Errors returned from the network routines.
505 pub enum Error {
506     /// Handshake failed.
507     #[error("Handshake failed: {0}")]
508     HandshakeFailed(String),
509     /// Connection closed unexpectedly.
510     #[error("Connection closed unexpectedly.")]
511     ConnectionClosed(Vec<u8>),
512 }
513 
514 // Global initialization and cleanup of the Windows Sockets API (WSA) module.
515 // NOTE: This has to be top-level in order for `ctor::{ctor, dtor}` to work.
516 #[cfg(windows)]
517 use std::sync::atomic::{AtomicBool, Ordering};
518 #[cfg(windows)]
519 static WSA_INITED: AtomicBool = AtomicBool::new(false);
520 
521 #[cfg(windows)]
522 #[ctor::ctor]
wsa_startup()523 fn wsa_startup() {
524     unsafe {
525         let ret = winsock2::WSAStartup(
526             0x202, // version 2.2
527             &mut std::mem::zeroed(),
528         );
529         WSA_INITED.store(ret != 0, Ordering::SeqCst);
530     }
531 }
532 
533 #[cfg(windows)]
534 #[ctor::dtor]
wsa_cleanup()535 fn wsa_cleanup() {
536     if WSA_INITED.load(Ordering::SeqCst) {
537         let _ = unsafe { winsock2::WSACleanup() };
538     }
539 }
540