1 //! Low-level IPC mechanism for Sequoia.
2 //!
3 //! # Rationale
4 //!
5 //! Sequoia makes use of background services e.g. for managing and
6 //! updating public keys.
7 //!
8 //! # Design
9 //!
10 //! We use the filesystem as namespace to discover services. Every
11 //! service has a file called rendezvous point. Access to this file
12 //! is serialized using file locking. This file contains a socket
13 //! address and a cookie that we use to connect to the server and
14 //! authenticate us. If the file does not exist, is malformed, or
15 //! does not point to a usable server, we start a new one on demand.
16 //!
17 //! This design mimics Unix sockets, but works on Windows too.
18 //!
19 //! # External vs internal servers
20 //!
21 //! These servers can be either in external processes, or co-located
22 //! within the current process. We will first start an external
23 //! process, and fall back to starting a thread instead.
24 //!
25 //! Using an external process is the preferred option. It allows us
26 //! to continuously update the keys in the keystore, for example. It
27 //! also means that we do not spawn a thread in your process, which is
28 //! frowned upon for various reasons.
29 //!
30 //! Please see [IPCPolicy] for more information.
31 //!
32 //! [IPCPolicy]: ../../sequoia_core/enum.IPCPolicy.html
33 //!
34 //! # Note
35 //!
36 //! Windows support is currently not implemented, but should be
37 //! straight forward.
38
39 use std::fs;
40 use std::io::{self, Read, Write};
41 use std::net::{Ipv4Addr, SocketAddr, TcpStream, TcpListener};
42 use std::path::PathBuf;
43
44 use anyhow::{anyhow, Result};
45 use fs2::FileExt;
46 use futures::{Future, Stream};
47
48 use tokio_core::net;
49 use tokio_io::io::ReadHalf;
50 use tokio_io::AsyncRead;
51
52 use capnp_rpc::{RpcSystem, twoparty};
53 use capnp_rpc::rpc_twoparty_capnp::Side;
54
55 #[cfg(unix)]
56 use std::os::unix::{io::{IntoRawFd, FromRawFd}, fs::OpenOptionsExt};
57 #[cfg(windows)]
58 use std::os::windows::io::{AsRawSocket, IntoRawSocket, FromRawSocket};
59 #[cfg(windows)]
60 use winapi::um::winsock2;
61
62 use std::process::{Command, Stdio};
63 use std::thread;
64
65 use sequoia_openpgp as openpgp;
66 use sequoia_core as core;
67
68 // Turns an `if let` into an expression so that it is possible to do
69 // things like:
70 //
71 // ```rust,nocompile
72 // if destructures_to(Foo::Bar(_) = value)
73 // || destructures_to(Foo::Bam(_) = value) { ... }
74 // ```
75 // TODO: Replace with `std::matches!` once MSRV is bumped to 1.42.
76 #[cfg(test)]
77 macro_rules! destructures_to {
78 ( $error: pat = $expr:expr ) => {
79 {
80 let x = $expr;
81 if let $error = x {
82 true
83 } else {
84 false
85 }
86 }
87 };
88 }
89
90 #[macro_use] mod trace;
91 pub mod assuan;
92 pub mod gnupg;
93 mod keygrip;
94 pub use self::keygrip::Keygrip;
95 pub mod sexp;
96
97 #[cfg(test)]
98 mod tests;
99
100 macro_rules! platform {
101 { unix => { $($unix:tt)* }, windows => { $($windows:tt)* } } => {
102 if cfg!(unix) {
103 #[cfg(unix)] { $($unix)* }
104 #[cfg(not(unix))] { unreachable!() }
105 } else if cfg!(windows) {
106 #[cfg(windows)] { $($windows)* }
107 #[cfg(not(windows))] { unreachable!() }
108 } else {
109 #[cfg(not(any(unix, windows)))] compile_error!("Unsupported platform");
110 unreachable!()
111 }
112 }
113 }
114
115 /// Servers need to implement this trait.
116 pub trait Handler {
117 /// Called on every connection.
handle(&self, network: twoparty::VatNetwork<ReadHalf<net::TcpStream>>) -> RpcSystem<Side>118 fn handle(&self,
119 network: twoparty::VatNetwork<ReadHalf<net::TcpStream>>)
120 -> RpcSystem<Side>;
121 }
122
123 /// A factory for handlers.
124 pub type HandlerFactory = fn(descriptor: Descriptor,
125 handle: tokio_core::reactor::Handle)
126 -> Result<Box<dyn Handler>>;
127
128 /// A descriptor is used to connect to a service.
129 #[derive(Clone)]
130 pub struct Descriptor {
131 ctx: core::Context,
132 rendezvous: PathBuf,
133 executable: PathBuf,
134 factory: HandlerFactory,
135 }
136
137 impl Descriptor {
138 /// Create a descriptor given its rendezvous point, the path to
139 /// the servers executable file, and a handler factory.
new(ctx: &core::Context, rendezvous: PathBuf, executable: PathBuf, factory: HandlerFactory) -> Self140 pub fn new(ctx: &core::Context, rendezvous: PathBuf,
141 executable: PathBuf, factory: HandlerFactory)
142 -> Self {
143 Descriptor {
144 ctx: ctx.clone(),
145 rendezvous,
146 executable,
147 factory,
148 }
149 }
150
151 /// Returns the context.
context(&self) -> &core::Context152 pub fn context(&self) -> &core::Context {
153 &self.ctx
154 }
155
156 /// Connects to a descriptor, starting the server if necessary.
connect(&self, handle: &tokio_core::reactor::Handle) -> Result<RpcSystem<Side>>157 pub fn connect(&self, handle: &tokio_core::reactor::Handle)
158 -> Result<RpcSystem<Side>> {
159 self.connect_with_policy(handle, *self.ctx.ipc_policy())
160 }
161
162 /// Connects to a descriptor, starting the server if necessary.
163 ///
164 /// This function does not use the context's IPC policy, but uses
165 /// the given one.
connect_with_policy(&self, handle: &tokio_core::reactor::Handle, policy: core::IPCPolicy) -> Result<RpcSystem<Side>>166 pub fn connect_with_policy(&self, handle: &tokio_core::reactor::Handle,
167 policy: core::IPCPolicy)
168 -> Result<RpcSystem<Side>> {
169 let do_connect = |cookie: Cookie, mut s: TcpStream| {
170 cookie.send(&mut s)?;
171
172 /* Tokioize. */
173 let stream = net::TcpStream::from_stream(s, &handle)?;
174 stream.set_nodelay(true)?;
175 let (reader, writer) = stream.split();
176
177 let network =
178 Box::new(twoparty::VatNetwork::new(reader, writer,
179 Side::Client,
180 Default::default()));
181
182 Ok(RpcSystem::new(network, None))
183 };
184
185 fs::create_dir_all(self.ctx.home())?;
186 let mut file = fs::OpenOptions::new();
187 file
188 .read(true)
189 .write(true)
190 .create(true);
191 #[cfg(unix)]
192 file.mode(0o600);
193 let mut file = file.open(&self.rendezvous)?;
194 file.lock_exclusive()?;
195
196 let mut c = vec![];
197 file.read_to_end(&mut c)?;
198
199 if let Some((cookie, rest)) = Cookie::extract(c) {
200 let stream = String::from_utf8(rest).map_err(drop)
201 .and_then(|rest| rest.parse::<SocketAddr>().map_err(drop))
202 .and_then(|addr| TcpStream::connect(addr).map_err(drop));
203
204 if let Ok(s) = stream {
205 do_connect(cookie, s)
206 } else {
207 /* Failed to connect. Invalidate the cookie and try again. */
208 file.set_len(0)?;
209 drop(file);
210 self.connect(handle)
211 }
212 } else {
213 let cookie = Cookie::new();
214
215 let (addr, external) = match policy {
216 core::IPCPolicy::Internal => self.start(false)?,
217 core::IPCPolicy::External => self.start(true)?,
218 core::IPCPolicy::Robust => self.start(true)
219 .or_else(|_| self.start(false))?
220 };
221
222 /* XXX: It'd be nice not to waste this connection. */
223 cookie.send(&mut TcpStream::connect(addr)?)?;
224
225 if external {
226 /* Write connection information to file. */
227 file.set_len(0)?;
228 file.write_all(&cookie.0)?;
229 write!(file, "{}", addr)?;
230 }
231 drop(file);
232
233 do_connect(cookie, TcpStream::connect(addr)?)
234 }
235 }
236
237 /// Start the service, either as an external process or as a
238 /// thread.
start(&self, external: bool) -> Result<(SocketAddr, bool)>239 fn start(&self, external: bool) -> Result<(SocketAddr, bool)> {
240 let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, 0)).unwrap();
241 let addr = listener.local_addr()?;
242
243 /* Start the server, connect to it, and send the cookie. */
244 if external {
245 self.fork(listener)?;
246 } else {
247 self.spawn(listener)?;
248 }
249
250 Ok((addr, external))
251 }
252
fork(&self, listener: TcpListener) -> Result<()>253 fn fork(&self, listener: TcpListener) -> Result<()> {
254 let mut cmd = Command::new(&self.executable);
255 cmd
256 .arg("--home")
257 .arg(self.ctx.home())
258 .arg("--lib")
259 .arg(self.ctx.lib())
260 .arg("--ephemeral")
261 .arg(self.ctx.ephemeral().to_string())
262 .stdout(Stdio::null())
263 .stderr(Stdio::null());
264
265 platform! {
266 unix => {
267 // Pass the listening TCP socket as child stdin.
268 cmd.stdin(unsafe { Stdio::from_raw_fd(listener.into_raw_fd()) });
269 },
270 windows => {
271 // Sockets for `TcpListener` are not inheritable by default, so
272 // let's make them so, since we'll pass them to a child process.
273 unsafe {
274 match winapi::um::handleapi::SetHandleInformation(
275 listener.as_raw_socket() as _,
276 winapi::um::winbase::HANDLE_FLAG_INHERIT,
277 winapi::um::winbase::HANDLE_FLAG_INHERIT,
278 ) {
279 0 => Err(std::io::Error::last_os_error()),
280 _ => Ok(())
281 }?
282 };
283 // We can't pass the socket to stdin directly on Windows, since
284 // non-overlapped (blocking) I/O handles can be redirected there.
285 // We use Tokio (async I/O), so we just pass it via env var rather
286 // than establishing a separate channel to pass the socket through.
287 cmd.env("SOCKET", format!("{}", listener.into_raw_socket()));
288 }
289 }
290
291 cmd.spawn()?;
292 Ok(())
293 }
294
spawn(&self, l: TcpListener) -> Result<()>295 fn spawn(&self, l: TcpListener) -> Result<()> {
296 let descriptor = self.clone();
297 thread::spawn(move || -> Result<()> {
298 Ok(Server::new(descriptor)
299 .expect("Failed to spawn server") // XXX
300 .serve_listener(l)
301 .expect("Failed to spawn server")) // XXX
302 });
303 Ok(())
304 }
305 }
306
307 /// A server.
308 pub struct Server {
309 core: tokio_core::reactor::Core,
310 descriptor: Descriptor,
311 }
312
313 impl Server {
314 /// Creates a new server for the descriptor.
new(descriptor: Descriptor) -> Result<Self>315 pub fn new(descriptor: Descriptor) -> Result<Self> {
316 Ok(Server {
317 core: tokio_core::reactor::Core::new()?,
318 descriptor,
319 })
320 }
321
322 /// Creates a Context from `env::args()`.
context() -> Result<core::Context>323 pub fn context() -> Result<core::Context> {
324 use std::env::args;
325 let args: Vec<String> = args().collect();
326
327 if args.len() != 7 || args[1] != "--home"
328 || args[3] != "--lib" || args[5] != "--ephemeral" {
329 return Err(anyhow!(
330 "Usage: {} --home <HOMEDIR> --lib <LIBDIR> \
331 --ephemeral true|false", args[0]));
332 }
333
334 let mut cfg = core::Context::configure()
335 .home(&args[2]).lib(&args[4]);
336
337 if let Ok(ephemeral) = args[6].parse() {
338 if ephemeral {
339 cfg.set_ephemeral();
340 }
341 } else {
342 return Err(anyhow!(
343 "Expected 'true' or 'false' for --ephemeral, got: {}",
344 args[6]));
345 }
346
347 cfg.build()
348 }
349
350 /// Turns this process into a server.
351 ///
352 /// External servers must call this early on.
353 ///
354 /// On Linux expects 'stdin' to be a listening TCP socket.
355 /// On Windows this expects `SOCKET` env var to be set to a listening socket
356 /// of the Windows Sockets API `SOCKET` value.
357 ///
358 /// # Example
359 ///
360 /// ```compile_fail
361 /// // We cannot run this because sequoia-store is not built yet.
362 /// extern crate sequoia_core;
363 /// extern crate sequoia_net;
364 /// extern crate sequoia_store;
365 ///
366 /// use sequoia_ipc::Server;
367 ///
368 /// fn main() {
369 /// let ctx = Server::context()
370 /// .expect("Failed to create context");
371 /// Server::new(sequoia_store::descriptor(&ctx))
372 /// .expect("Failed to create server")
373 /// .serve()
374 /// .expect("Failed to start server");
375 /// }
376 /// ```
serve(&mut self) -> Result<()>377 pub fn serve(&mut self) -> Result<()> {
378 let listener = platform! {
379 unix => { unsafe { TcpListener::from_raw_fd(0) } },
380 windows => {
381 let socket = std::env::var("SOCKET")?.parse()?;
382 unsafe { TcpListener::from_raw_socket(socket) }
383 }
384 };
385 self.serve_listener(listener)
386 }
387
serve_listener(&mut self, l: TcpListener) -> Result<()>388 fn serve_listener(&mut self, l: TcpListener) -> Result<()> {
389 /* The first client tells us our cookie. */
390 let cookie = {
391 /* XXX: It'd be nice to recycle this connection. */
392 let mut i = l.accept()?;
393 Cookie::receive(&mut i.0)?
394 };
395
396 let handler = (self.descriptor.factory)(self.descriptor.clone(), self.core.handle())?;
397
398 /* Tokioize. */
399 let handle = self.core.handle();
400 let a = l.local_addr()?;
401 let socket = tokio_core::net::TcpListener::from_listener(l, &a, &handle).unwrap();
402
403 let done = socket.incoming().and_then(|(socket, _addr)| {
404 let _ = socket.set_nodelay(true);
405 Cookie::receive_async(socket)
406 }).and_then(|(socket, received_cookie)| {
407 if received_cookie == cookie {
408 Ok(socket)
409 } else {
410 Err(io::Error::new(io::ErrorKind::BrokenPipe, "Bad cookie."))
411 }
412 }).for_each(|socket| {
413 let (reader, writer) = socket.split();
414
415 let network =
416 twoparty::VatNetwork::new(reader, writer,
417 Side::Server, Default::default());
418
419 let rpc_system = handler.handle(network);
420 handle.spawn(rpc_system.map_err(|e| println!("error: {:?}", e)));
421 Ok(())
422 });
423
424 Ok(self.core.run(done)?)
425 }
426 }
427
428 /// Cookies are used to authenticate clients.
429 struct Cookie(Vec<u8>);
430
431 use rand::RngCore;
432 use rand::rngs::OsRng;
433
434 impl Cookie {
435 const SIZE: usize = 32;
436
437 /// Make a new cookie.
new() -> Self438 fn new() -> Self {
439 let mut c = vec![0; Cookie::SIZE];
440 OsRng.fill_bytes(&mut c);
441 Cookie(c)
442 }
443
444 /// Make a new cookie from a slice.
from(buf: &[u8]) -> Option<Self>445 fn from(buf: &[u8]) -> Option<Self> {
446 if buf.len() == Cookie::SIZE {
447 let mut c = Vec::with_capacity(Cookie::SIZE);
448 c.extend_from_slice(buf);
449 Some(Cookie(c))
450 } else {
451 None
452 }
453 }
454
455 /// Given a vector starting with a cookie, extract it and return
456 /// the rest.
extract(mut buf: Vec<u8>) -> Option<(Self, Vec<u8>)>457 fn extract(mut buf: Vec<u8>) -> Option<(Self, Vec<u8>)> {
458 if buf.len() >= Cookie::SIZE {
459 let r = buf.split_off(Cookie::SIZE);
460 Some((Cookie(buf), r))
461 } else {
462 None
463 }
464 }
465
466 /// Read a cookie from 'from'.
receive<R: Read>(from: &mut R) -> Result<Self>467 fn receive<R: Read>(from: &mut R) -> Result<Self> {
468 let mut buf = vec![0; Cookie::SIZE];
469 from.read_exact(&mut buf)?;
470 Ok(Cookie(buf))
471 }
472
473 /// Asynchronously read a cookie from 'socket'.
receive_async(socket: net::TcpStream) -> impl Future<Item = (net::TcpStream, Cookie), Error = io::Error>474 fn receive_async(socket: net::TcpStream)
475 -> impl Future<Item = (net::TcpStream, Cookie), Error = io::Error> {
476 let buf = vec![0; Cookie::SIZE];
477 tokio_io::io::read_exact(socket, buf)
478 .and_then(|(socket, buf)| {
479 Ok((socket, Cookie::from(&buf).expect("enough bytes read")))
480 })
481 }
482
483
484 /// Write a cookie to 'to'.
send<W: Write>(&self, to: &mut W) -> io::Result<()>485 fn send<W: Write>(&self, to: &mut W) -> io::Result<()> {
486 to.write_all(&self.0)
487 }
488 }
489
490 impl PartialEq for Cookie {
eq(&self, other: &Cookie) -> bool491 fn eq(&self, other: &Cookie) -> bool {
492 // First, compare the length.
493 self.0.len() == other.0.len()
494 // The length is not a secret, hence we can use && here.
495 && unsafe {
496 ::memsec::memeq(self.0.as_ptr(),
497 other.0.as_ptr(),
498 self.0.len())
499 }
500 }
501 }
502
503 #[derive(thiserror::Error, Debug)]
504 /// Errors returned from the network routines.
505 pub enum Error {
506 /// Handshake failed.
507 #[error("Handshake failed: {0}")]
508 HandshakeFailed(String),
509 /// Connection closed unexpectedly.
510 #[error("Connection closed unexpectedly.")]
511 ConnectionClosed(Vec<u8>),
512 }
513
514 // Global initialization and cleanup of the Windows Sockets API (WSA) module.
515 // NOTE: This has to be top-level in order for `ctor::{ctor, dtor}` to work.
516 #[cfg(windows)]
517 use std::sync::atomic::{AtomicBool, Ordering};
518 #[cfg(windows)]
519 static WSA_INITED: AtomicBool = AtomicBool::new(false);
520
521 #[cfg(windows)]
522 #[ctor::ctor]
wsa_startup()523 fn wsa_startup() {
524 unsafe {
525 let ret = winsock2::WSAStartup(
526 0x202, // version 2.2
527 &mut std::mem::zeroed(),
528 );
529 WSA_INITED.store(ret != 0, Ordering::SeqCst);
530 }
531 }
532
533 #[cfg(windows)]
534 #[ctor::dtor]
wsa_cleanup()535 fn wsa_cleanup() {
536 if WSA_INITED.load(Ordering::SeqCst) {
537 let _ = unsafe { winsock2::WSACleanup() };
538 }
539 }
540