1 // Copyright 2016 Mozilla Foundation
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 // For tokio_io::codec::length_delimited::Framed;
16 #![allow(deprecated)]
17 
18 use crate::cache::{storage_from_config, Storage};
19 use crate::compiler::{
20     get_compiler_info, CacheControl, CompileResult, Compiler, CompilerArguments, CompilerHasher,
21     CompilerKind, CompilerProxy, DistType, MissType,
22 };
23 #[cfg(feature = "dist-client")]
24 use crate::config;
25 use crate::config::Config;
26 use crate::dist;
27 use crate::jobserver::Client;
28 use crate::mock_command::{CommandCreatorSync, ProcessCommandCreator};
29 use crate::protocol::{Compile, CompileFinished, CompileResponse, Request, Response};
30 use crate::util;
31 #[cfg(feature = "dist-client")]
32 use anyhow::Context as _;
33 use filetime::FileTime;
34 use futures::sync::mpsc;
35 use futures::{future, stream, Async, AsyncSink, Future, Poll, Sink, StartSend, Stream};
36 use futures_03::compat::Compat;
37 use futures_03::executor::ThreadPool;
38 use number_prefix::NumberPrefix;
39 use std::cell::RefCell;
40 use std::collections::HashMap;
41 use std::env;
42 use std::ffi::{OsStr, OsString};
43 use std::fs::metadata;
44 use std::io::{self, Write};
45 #[cfg(feature = "dist-client")]
46 use std::mem;
47 use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
48 use std::path::PathBuf;
49 use std::pin::Pin;
50 use std::process::{ExitStatus, Output};
51 use std::rc::Rc;
52 use std::sync::Arc;
53 #[cfg(feature = "dist-client")]
54 use std::sync::Mutex;
55 use std::task::{Context, Waker};
56 use std::time::Duration;
57 use std::time::Instant;
58 use std::u64;
59 use tokio_compat::runtime::current_thread::Runtime;
60 use tokio_io::codec::length_delimited;
61 use tokio_io::codec::length_delimited::Framed;
62 use tokio_io::{AsyncRead, AsyncWrite};
63 use tokio_serde_bincode::{ReadBincode, WriteBincode};
64 use tokio_tcp::TcpListener;
65 use tokio_timer::{Delay, Timeout};
66 use tower::Service;
67 
68 use crate::errors::*;
69 
70 /// If the server is idle for this many seconds, shut down.
71 const DEFAULT_IDLE_TIMEOUT: u64 = 600;
72 
73 /// If the dist client couldn't be created, retry creation at this number
74 /// of seconds from now (or later)
75 #[cfg(feature = "dist-client")]
76 const DIST_CLIENT_RECREATE_TIMEOUT: Duration = Duration::from_secs(30);
77 
78 /// Result of background server startup.
79 #[derive(Debug, Serialize, Deserialize)]
80 pub enum ServerStartup {
81     /// Server started successfully on `port`.
82     Ok { port: u16 },
83     /// Server Addr already in suse
84     AddrInUse,
85     /// Timed out waiting for server startup.
86     TimedOut,
87     /// Server encountered an error.
88     Err { reason: String },
89 }
90 
91 /// Get the time the server should idle for before shutting down.
get_idle_timeout() -> u6492 fn get_idle_timeout() -> u64 {
93     // A value of 0 disables idle shutdown entirely.
94     env::var("SCCACHE_IDLE_TIMEOUT")
95         .ok()
96         .and_then(|s| s.parse().ok())
97         .unwrap_or(DEFAULT_IDLE_TIMEOUT)
98 }
99 
notify_server_startup_internal<W: Write>(mut w: W, status: ServerStartup) -> Result<()>100 fn notify_server_startup_internal<W: Write>(mut w: W, status: ServerStartup) -> Result<()> {
101     util::write_length_prefixed_bincode(&mut w, status)
102 }
103 
104 #[cfg(unix)]
notify_server_startup(name: &Option<OsString>, status: ServerStartup) -> Result<()>105 fn notify_server_startup(name: &Option<OsString>, status: ServerStartup) -> Result<()> {
106     use std::os::unix::net::UnixStream;
107     let name = match *name {
108         Some(ref s) => s,
109         None => return Ok(()),
110     };
111     debug!("notify_server_startup({:?})", status);
112     let stream = UnixStream::connect(name)?;
113     notify_server_startup_internal(stream, status)
114 }
115 
116 #[cfg(windows)]
notify_server_startup(name: &Option<OsString>, status: ServerStartup) -> Result<()>117 fn notify_server_startup(name: &Option<OsString>, status: ServerStartup) -> Result<()> {
118     use std::fs::OpenOptions;
119 
120     let name = match *name {
121         Some(ref s) => s,
122         None => return Ok(()),
123     };
124     let pipe = OpenOptions::new().write(true).read(true).open(name)?;
125     notify_server_startup_internal(pipe, status)
126 }
127 
128 #[cfg(unix)]
get_signal(status: ExitStatus) -> i32129 fn get_signal(status: ExitStatus) -> i32 {
130     use std::os::unix::prelude::*;
131     status.signal().expect("must have signal")
132 }
133 #[cfg(windows)]
get_signal(_status: ExitStatus) -> i32134 fn get_signal(_status: ExitStatus) -> i32 {
135     panic!("no signals on windows")
136 }
137 
138 pub struct DistClientContainer {
139     // The actual dist client state
140     #[cfg(feature = "dist-client")]
141     state: Mutex<DistClientState>,
142 }
143 
144 #[cfg(feature = "dist-client")]
145 struct DistClientConfig {
146     // Reusable items tied to an SccacheServer instance
147     pool: ThreadPool,
148 
149     // From the static dist configuration
150     scheduler_url: Option<config::HTTPUrl>,
151     auth: config::DistAuth,
152     cache_dir: PathBuf,
153     toolchain_cache_size: u64,
154     toolchains: Vec<config::DistToolchainConfig>,
155     rewrite_includes_only: bool,
156 }
157 
158 #[cfg(feature = "dist-client")]
159 enum DistClientState {
160     #[cfg(feature = "dist-client")]
161     Some(Box<DistClientConfig>, Arc<dyn dist::Client>),
162     #[cfg(feature = "dist-client")]
163     FailWithMessage(Box<DistClientConfig>, String),
164     #[cfg(feature = "dist-client")]
165     RetryCreateAt(Box<DistClientConfig>, Instant),
166     Disabled,
167 }
168 
169 #[cfg(not(feature = "dist-client"))]
170 impl DistClientContainer {
171     #[cfg(not(feature = "dist-client"))]
new(config: &Config, _: &ThreadPool) -> Self172     fn new(config: &Config, _: &ThreadPool) -> Self {
173         if config.dist.scheduler_url.is_some() {
174             warn!("Scheduler address configured but dist feature disabled, disabling distributed sccache")
175         }
176         Self {}
177     }
178 
new_disabled() -> Self179     pub fn new_disabled() -> Self {
180         Self {}
181     }
182 
reset_state(&self)183     pub fn reset_state(&self) {}
184 
get_status(&self) -> DistInfo185     pub fn get_status(&self) -> DistInfo {
186         DistInfo::Disabled("dist-client feature not selected".to_string())
187     }
188 
get_client(&self) -> Result<Option<Arc<dyn dist::Client>>>189     fn get_client(&self) -> Result<Option<Arc<dyn dist::Client>>> {
190         Ok(None)
191     }
192 }
193 
194 #[cfg(feature = "dist-client")]
195 impl DistClientContainer {
new(config: &Config, pool: &ThreadPool) -> Self196     fn new(config: &Config, pool: &ThreadPool) -> Self {
197         let config = DistClientConfig {
198             pool: pool.clone(),
199             scheduler_url: config.dist.scheduler_url.clone(),
200             auth: config.dist.auth.clone(),
201             cache_dir: config.dist.cache_dir.clone(),
202             toolchain_cache_size: config.dist.toolchain_cache_size,
203             toolchains: config.dist.toolchains.clone(),
204             rewrite_includes_only: config.dist.rewrite_includes_only,
205         };
206         let state = Self::create_state(config);
207         Self {
208             state: Mutex::new(state),
209         }
210     }
211 
new_disabled() -> Self212     pub fn new_disabled() -> Self {
213         Self {
214             state: Mutex::new(DistClientState::Disabled),
215         }
216     }
217 
reset_state(&self)218     pub fn reset_state(&self) {
219         let mut guard = self.state.lock();
220         let state = guard.as_mut().unwrap();
221         let state: &mut DistClientState = &mut **state;
222         match mem::replace(state, DistClientState::Disabled) {
223             DistClientState::Some(cfg, _)
224             | DistClientState::FailWithMessage(cfg, _)
225             | DistClientState::RetryCreateAt(cfg, _) => {
226                 warn!("State reset. Will recreate");
227                 *state =
228                     DistClientState::RetryCreateAt(cfg, Instant::now() - Duration::from_secs(1));
229             }
230             DistClientState::Disabled => (),
231         }
232     }
233 
get_status(&self) -> DistInfo234     pub fn get_status(&self) -> DistInfo {
235         let mut guard = self.state.lock();
236         let state = guard.as_mut().unwrap();
237         let state: &mut DistClientState = &mut **state;
238         match state {
239             DistClientState::Disabled => DistInfo::Disabled("disabled".to_string()),
240             DistClientState::FailWithMessage(cfg, _) => DistInfo::NotConnected(
241                 cfg.scheduler_url.clone(),
242                 "enabled, auth not configured".to_string(),
243             ),
244             DistClientState::RetryCreateAt(cfg, _) => DistInfo::NotConnected(
245                 cfg.scheduler_url.clone(),
246                 "enabled, not connected, will retry".to_string(),
247             ),
248             DistClientState::Some(cfg, client) => match client.do_get_status().wait() {
249                 Ok(res) => DistInfo::SchedulerStatus(cfg.scheduler_url.clone(), res),
250                 Err(_) => DistInfo::NotConnected(
251                     cfg.scheduler_url.clone(),
252                     "could not communicate with scheduler".to_string(),
253                 ),
254             },
255         }
256     }
257 
get_client(&self) -> Result<Option<Arc<dyn dist::Client>>>258     fn get_client(&self) -> Result<Option<Arc<dyn dist::Client>>> {
259         let mut guard = self.state.lock();
260         let state = guard.as_mut().unwrap();
261         let state: &mut DistClientState = &mut **state;
262         Self::maybe_recreate_state(state);
263         let res = match state {
264             DistClientState::Some(_, dc) => Ok(Some(dc.clone())),
265             DistClientState::Disabled | DistClientState::RetryCreateAt(_, _) => Ok(None),
266             DistClientState::FailWithMessage(_, msg) => Err(anyhow!(msg.clone())),
267         };
268         if res.is_err() {
269             let config = match mem::replace(state, DistClientState::Disabled) {
270                 DistClientState::FailWithMessage(config, _) => config,
271                 _ => unreachable!(),
272             };
273             // The client is most likely mis-configured, make sure we
274             // re-create on our next attempt.
275             *state =
276                 DistClientState::RetryCreateAt(config, Instant::now() - Duration::from_secs(1));
277         }
278         res
279     }
280 
maybe_recreate_state(state: &mut DistClientState)281     fn maybe_recreate_state(state: &mut DistClientState) {
282         if let DistClientState::RetryCreateAt(_, instant) = *state {
283             if instant > Instant::now() {
284                 return;
285             }
286             let config = match mem::replace(state, DistClientState::Disabled) {
287                 DistClientState::RetryCreateAt(config, _) => config,
288                 _ => unreachable!(),
289             };
290             info!("Attempting to recreate the dist client");
291             *state = Self::create_state(*config)
292         }
293     }
294 
295     // Attempt to recreate the dist client
create_state(config: DistClientConfig) -> DistClientState296     fn create_state(config: DistClientConfig) -> DistClientState {
297         macro_rules! try_or_retry_later {
298             ($v:expr) => {{
299                 match $v {
300                     Ok(v) => v,
301                     Err(e) => {
302                         // `{:?}` prints the full cause chain and backtrace.
303                         error!("{:?}", e);
304                         return DistClientState::RetryCreateAt(
305                             Box::new(config),
306                             Instant::now() + DIST_CLIENT_RECREATE_TIMEOUT,
307                         );
308                     }
309                 }
310             }};
311         }
312 
313         macro_rules! try_or_fail_with_message {
314             ($v:expr) => {{
315                 match $v {
316                     Ok(v) => v,
317                     Err(e) => {
318                         // `{:?}` prints the full cause chain and backtrace.
319                         let errmsg = format!("{:?}", e);
320                         error!("{}", errmsg);
321                         return DistClientState::FailWithMessage(
322                             Box::new(config),
323                             errmsg.to_string(),
324                         );
325                     }
326                 }
327             }};
328         }
329         match config.scheduler_url {
330             Some(ref addr) => {
331                 let url = addr.to_url();
332                 info!("Enabling distributed sccache to {}", url);
333                 let auth_token = match &config.auth {
334                     config::DistAuth::Token { token } => Ok(token.to_owned()),
335                     config::DistAuth::Oauth2CodeGrantPKCE { auth_url, .. }
336                     | config::DistAuth::Oauth2Implicit { auth_url, .. } => {
337                         Self::get_cached_config_auth_token(auth_url)
338                     }
339                 };
340                 let auth_token = try_or_fail_with_message!(auth_token
341                     .context("could not load client auth token, run |sccache --dist-auth|"));
342                 let dist_client = dist::http::Client::new(
343                     &config.pool,
344                     url,
345                     &config.cache_dir.join("client"),
346                     config.toolchain_cache_size,
347                     &config.toolchains,
348                     auth_token,
349                     config.rewrite_includes_only,
350                 );
351                 let dist_client =
352                     try_or_retry_later!(dist_client.context("failure during dist client creation"));
353                 use crate::dist::Client;
354                 match dist_client.do_get_status().wait() {
355                     Ok(res) => {
356                         info!(
357                             "Successfully created dist client with {:?} cores across {:?} servers",
358                             res.num_cpus, res.num_servers
359                         );
360                         DistClientState::Some(Box::new(config), Arc::new(dist_client))
361                     }
362                     Err(_) => {
363                         warn!("Scheduler address configured, but could not communicate with scheduler");
364                         DistClientState::RetryCreateAt(
365                             Box::new(config),
366                             Instant::now() + DIST_CLIENT_RECREATE_TIMEOUT,
367                         )
368                     }
369                 }
370             }
371             None => {
372                 info!("No scheduler address configured, disabling distributed sccache");
373                 DistClientState::Disabled
374             }
375         }
376     }
377 
get_cached_config_auth_token(auth_url: &str) -> Result<String>378     fn get_cached_config_auth_token(auth_url: &str) -> Result<String> {
379         let cached_config = config::CachedConfig::reload()?;
380         cached_config
381             .with(|c| c.dist.auth_tokens.get(auth_url).map(String::to_owned))
382             .with_context(|| format!("token for url {} not present in cached config", auth_url))
383     }
384 }
385 
386 /// Start an sccache server, listening on `port`.
387 ///
388 /// Spins an event loop handling client connections until a client
389 /// requests a shutdown.
start_server(config: &Config, port: u16) -> Result<()>390 pub fn start_server(config: &Config, port: u16) -> Result<()> {
391     info!("start_server: port: {}", port);
392     let client = unsafe { Client::new() };
393     let runtime = Runtime::new()?;
394     let pool = ThreadPool::builder()
395         .pool_size(std::cmp::max(20, 2 * num_cpus::get()))
396         .create()?;
397     let dist_client = DistClientContainer::new(config, &pool);
398     let storage = storage_from_config(config, &pool);
399     let res = SccacheServer::<ProcessCommandCreator>::new(
400         port,
401         pool,
402         runtime,
403         client,
404         dist_client,
405         storage,
406     );
407     let notify = env::var_os("SCCACHE_STARTUP_NOTIFY");
408     match res {
409         Ok(srv) => {
410             let port = srv.port();
411             info!("server started, listening on port {}", port);
412             notify_server_startup(&notify, ServerStartup::Ok { port })?;
413             srv.run(future::empty::<(), ()>())?;
414             Ok(())
415         }
416         Err(e) => {
417             error!("failed to start server: {}", e);
418             match e.downcast_ref::<io::Error>() {
419                 Some(io_err) if io::ErrorKind::AddrInUse == io_err.kind() => {
420                     notify_server_startup(&notify, ServerStartup::AddrInUse)?;
421                 }
422                 _ => {
423                     let reason = e.to_string();
424                     notify_server_startup(&notify, ServerStartup::Err { reason })?;
425                 }
426             };
427             Err(e)
428         }
429     }
430 }
431 
432 pub struct SccacheServer<C: CommandCreatorSync> {
433     runtime: Runtime,
434     listener: TcpListener,
435     rx: mpsc::Receiver<ServerMessage>,
436     timeout: Duration,
437     service: SccacheService<C>,
438     wait: WaitUntilZero,
439 }
440 
441 impl<C: CommandCreatorSync> SccacheServer<C> {
new( port: u16, pool: ThreadPool, runtime: Runtime, client: Client, dist_client: DistClientContainer, storage: Arc<dyn Storage>, ) -> Result<SccacheServer<C>>442     pub fn new(
443         port: u16,
444         pool: ThreadPool,
445         runtime: Runtime,
446         client: Client,
447         dist_client: DistClientContainer,
448         storage: Arc<dyn Storage>,
449     ) -> Result<SccacheServer<C>> {
450         let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), port);
451         let listener = TcpListener::bind(&SocketAddr::V4(addr))?;
452 
453         // Prepare the service which we'll use to service all incoming TCP
454         // connections.
455         let (tx, rx) = mpsc::channel(1);
456         let (wait, info) = WaitUntilZero::new();
457         let service = SccacheService::new(dist_client, storage, &client, pool, tx, info);
458 
459         Ok(SccacheServer {
460             runtime,
461             listener,
462             rx,
463             service,
464             timeout: Duration::from_secs(get_idle_timeout()),
465             wait,
466         })
467     }
468 
469     /// Configures how long this server will be idle before shutting down.
470     #[allow(dead_code)]
set_idle_timeout(&mut self, timeout: Duration)471     pub fn set_idle_timeout(&mut self, timeout: Duration) {
472         self.timeout = timeout;
473     }
474 
475     /// Set the storage this server will use.
476     #[allow(dead_code)]
set_storage(&mut self, storage: Arc<dyn Storage>)477     pub fn set_storage(&mut self, storage: Arc<dyn Storage>) {
478         self.service.storage = storage;
479     }
480 
481     /// Returns a reference to a thread pool to run work on
482     #[allow(dead_code)]
pool(&self) -> &ThreadPool483     pub fn pool(&self) -> &ThreadPool {
484         &self.service.pool
485     }
486 
487     /// Returns a reference to the command creator this server will use
488     #[allow(dead_code)]
command_creator(&self) -> &C489     pub fn command_creator(&self) -> &C {
490         &self.service.creator
491     }
492 
493     /// Returns the port that this server is bound to
494     #[allow(dead_code)]
port(&self) -> u16495     pub fn port(&self) -> u16 {
496         self.listener.local_addr().unwrap().port()
497     }
498 
499     /// Runs this server to completion.
500     ///
501     /// If the `shutdown` future resolves then the server will be shut down,
502     /// otherwise the server may naturally shut down if it becomes idle for too
503     /// long anyway.
run<F>(self, shutdown: F) -> io::Result<()> where F: Future,504     pub fn run<F>(self, shutdown: F) -> io::Result<()>
505     where
506         F: Future,
507     {
508         self._run(Box::new(shutdown.then(|_| Ok(()))))
509     }
510 
_run<'a>(self, shutdown: Box<dyn Future<Item = (), Error = ()> + 'a>) -> io::Result<()>511     fn _run<'a>(self, shutdown: Box<dyn Future<Item = (), Error = ()> + 'a>) -> io::Result<()> {
512         let SccacheServer {
513             mut runtime,
514             listener,
515             rx,
516             service,
517             timeout,
518             wait,
519         } = self;
520 
521         // Create our "server future" which will simply handle all incoming
522         // connections in separate tasks.
523         let server = listener.incoming().for_each(move |socket| {
524             trace!("incoming connection");
525             tokio_compat::runtime::current_thread::TaskExecutor::current()
526                 .spawn_local(Box::new(service.clone().bind(socket).map_err(|err| {
527                     error!("{}", err);
528                 })))
529                 .unwrap();
530             Ok(())
531         });
532 
533         // Right now there's a whole bunch of ways to shut down this server for
534         // various purposes. These include:
535         //
536         // 1. The `shutdown` future above.
537         // 2. An RPC indicating the server should shut down
538         // 3. A period of inactivity (no requests serviced)
539         //
540         // These are all encapsulated wih the future that we're creating below.
541         // The `ShutdownOrInactive` indicates the RPC or the period of
542         // inactivity, and this is then select'd with the `shutdown` future
543         // passed to this function.
544 
545         let shutdown = shutdown.map(|a| {
546             info!("shutting down due to explicit signal");
547             a
548         });
549 
550         let mut futures = vec![
551             Box::new(server) as Box<dyn Future<Item = _, Error = _>>,
552             Box::new(
553                 shutdown
554                     .map_err(|()| io::Error::new(io::ErrorKind::Other, "shutdown signal failed")),
555             ),
556         ];
557 
558         let shutdown_idle = ShutdownOrInactive {
559             rx,
560             timeout: if timeout != Duration::new(0, 0) {
561                 Some(Delay::new(Instant::now() + timeout))
562             } else {
563                 None
564             },
565             timeout_dur: timeout,
566         };
567         futures.push(Box::new(shutdown_idle.map(|a| {
568             info!("shutting down due to being idle or request");
569             a
570         })));
571 
572         let server = future::select_all(futures);
573         runtime.block_on(server).map_err(|p| p.0)?;
574 
575         info!(
576             "moving into the shutdown phase now, waiting at most 10 seconds \
577              for all client requests to complete"
578         );
579 
580         // Once our server has shut down either due to inactivity or a manual
581         // request we still need to give a bit of time for all active
582         // connections to finish. This `wait` future will resolve once all
583         // instances of `SccacheService` have been dropped.
584         //
585         // Note that we cap the amount of time this can take, however, as we
586         // don't want to wait *too* long.
587         runtime
588             .block_on(Timeout::new(Compat::new(wait), Duration::new(30, 0)))
589             .map_err(|e| {
590                 if e.is_inner() {
591                     e.into_inner().unwrap()
592                 } else {
593                     io::Error::new(io::ErrorKind::Other, e)
594                 }
595             })?;
596 
597         info!("ok, fully shutting down now");
598 
599         Ok(())
600     }
601 }
602 
603 type CompilerMap<C> = HashMap<PathBuf, Option<CompilerCacheEntry<C>>>;
604 
605 /// entry of the compiler cache
606 struct CompilerCacheEntry<C: CommandCreatorSync> {
607     /// compiler argument trait obj
608     pub compiler: Box<dyn Compiler<C>>,
609     /// modification time of the compilers executable file
610     pub mtime: FileTime,
611     /// distributed compilation extra info
612     pub dist_info: Option<(PathBuf, FileTime)>,
613 }
614 
615 impl<C> CompilerCacheEntry<C>
616 where
617     C: CommandCreatorSync,
618 {
new( compiler: Box<dyn Compiler<C>>, mtime: FileTime, dist_info: Option<(PathBuf, FileTime)>, ) -> Self619     fn new(
620         compiler: Box<dyn Compiler<C>>,
621         mtime: FileTime,
622         dist_info: Option<(PathBuf, FileTime)>,
623     ) -> Self {
624         Self {
625             compiler,
626             mtime,
627             dist_info,
628         }
629     }
630 }
631 /// Service implementation for sccache
632 #[derive(Clone)]
633 struct SccacheService<C: CommandCreatorSync> {
634     /// Server statistics.
635     stats: Rc<RefCell<ServerStats>>,
636 
637     /// Distributed sccache client
638     dist_client: Rc<DistClientContainer>,
639 
640     /// Cache storage.
641     storage: Arc<dyn Storage>,
642 
643     /// A cache of known compiler info.
644     compilers: Rc<RefCell<CompilerMap<C>>>,
645 
646     /// map the cwd with compiler proxy path to a proxy resolver, which
647     /// will dynamically resolve the input compiler for the current context
648     /// (usually file or current working directory)
649     /// the associated `FileTime` is the modification time of
650     /// the compiler proxy, in order to track updates of the proxy itself
651     compiler_proxies: Rc<RefCell<HashMap<PathBuf, (Box<dyn CompilerProxy<C>>, FileTime)>>>,
652 
653     /// Thread pool to execute work in
654     pool: ThreadPool,
655 
656     /// An object for creating commands.
657     ///
658     /// This is mostly useful for unit testing, where we
659     /// can mock this out.
660     creator: C,
661 
662     /// Message channel used to learn about requests received by this server.
663     ///
664     /// Note that messages sent along this channel will keep the server alive
665     /// (reset the idle timer) and this channel can also be used to shut down
666     /// the entire server immediately via a message.
667     tx: mpsc::Sender<ServerMessage>,
668 
669     /// Information tracking how many services (connected clients) are active.
670     info: ActiveInfo,
671 }
672 
673 type SccacheRequest = Message<Request, Body<()>>;
674 type SccacheResponse = Message<Response, Body<Response>>;
675 
676 /// Messages sent from all services to the main event loop indicating activity.
677 ///
678 /// Whenever a request is receive a `Request` message is sent which will reset
679 /// the idle shutdown timer, and otherwise a `Shutdown` message indicates that
680 /// a server shutdown was requested via an RPC.
681 pub enum ServerMessage {
682     /// A message sent whenever a request is received.
683     Request,
684     /// Message sent whenever a shutdown request is received.
685     Shutdown,
686 }
687 
688 impl<C> Service<SccacheRequest> for SccacheService<C>
689 where
690     C: CommandCreatorSync + 'static,
691 {
692     type Response = SccacheResponse;
693     type Error = Error;
694     type Future = SFuture<Self::Response>;
695 
call(&mut self, req: SccacheRequest) -> Self::Future696     fn call(&mut self, req: SccacheRequest) -> Self::Future {
697         trace!("handle_client");
698 
699         // Opportunistically let channel know that we've received a request. We
700         // ignore failures here as well as backpressure as it's not imperative
701         // that every message is received.
702         drop(self.tx.clone().start_send(ServerMessage::Request));
703 
704         let res: SFuture<Response> = match req.into_inner() {
705             Request::Compile(compile) => {
706                 debug!("handle_client: compile");
707                 self.stats.borrow_mut().compile_requests += 1;
708                 return self.handle_compile(compile);
709             }
710             Request::GetStats => {
711                 debug!("handle_client: get_stats");
712                 Box::new(self.get_info().map(|i| Response::Stats(Box::new(i))))
713             }
714             Request::DistStatus => {
715                 debug!("handle_client: dist_status");
716                 Box::new(self.get_dist_status().map(Response::DistStatus))
717             }
718             Request::ZeroStats => {
719                 debug!("handle_client: zero_stats");
720                 self.zero_stats();
721                 Box::new(self.get_info().map(|i| Response::Stats(Box::new(i))))
722             }
723             Request::Shutdown => {
724                 debug!("handle_client: shutdown");
725                 let future = self
726                     .tx
727                     .clone()
728                     .send(ServerMessage::Shutdown)
729                     .then(|_| Ok(()));
730                 let info_future = self.get_info();
731                 return Box::new(future.join(info_future).map(move |(_, info)| {
732                     Message::WithoutBody(Response::ShuttingDown(Box::new(info)))
733                 }));
734             }
735         };
736 
737         Box::new(res.map(Message::WithoutBody))
738     }
739 
poll_ready(&mut self) -> Poll<(), Self::Error>740     fn poll_ready(&mut self) -> Poll<(), Self::Error> {
741         Ok(Async::Ready(()))
742     }
743 }
744 
745 impl<C> SccacheService<C>
746 where
747     C: CommandCreatorSync,
748 {
new( dist_client: DistClientContainer, storage: Arc<dyn Storage>, client: &Client, pool: ThreadPool, tx: mpsc::Sender<ServerMessage>, info: ActiveInfo, ) -> SccacheService<C>749     pub fn new(
750         dist_client: DistClientContainer,
751         storage: Arc<dyn Storage>,
752         client: &Client,
753         pool: ThreadPool,
754         tx: mpsc::Sender<ServerMessage>,
755         info: ActiveInfo,
756     ) -> SccacheService<C> {
757         SccacheService {
758             stats: Rc::new(RefCell::new(ServerStats::default())),
759             dist_client: Rc::new(dist_client),
760             storage,
761             compilers: Rc::new(RefCell::new(HashMap::new())),
762             compiler_proxies: Rc::new(RefCell::new(HashMap::new())),
763             pool,
764             creator: C::new(client),
765             tx,
766             info,
767         }
768     }
769 
bind<T>(mut self, socket: T) -> impl Future<Item = (), Error = Error> where T: AsyncRead + AsyncWrite + 'static,770     fn bind<T>(mut self, socket: T) -> impl Future<Item = (), Error = Error>
771     where
772         T: AsyncRead + AsyncWrite + 'static,
773     {
774         let mut builder = length_delimited::Builder::new();
775         if let Ok(max_frame_length_str) = env::var("SCCACHE_MAX_FRAME_LENGTH") {
776             if let Ok(max_frame_length) = max_frame_length_str.parse::<usize>() {
777                 builder.max_frame_length(max_frame_length);
778             } else {
779                 warn!("Content of SCCACHE_MAX_FRAME_LENGTH is  not a valid number, using default");
780             }
781         }
782         let io = builder.new_framed(socket);
783 
784         let (sink, stream) = SccacheTransport {
785             inner: WriteBincode::new(ReadBincode::new(io)),
786         }
787         .split();
788         let sink = sink.sink_from_err::<Error>();
789 
790         stream
791             .from_err::<Error>()
792             .and_then(move |input| self.call(input))
793             .and_then(|message| {
794                 let f: Box<dyn Stream<Item = _, Error = _>> = match message {
795                     Message::WithoutBody(message) => Box::new(stream::once(Ok(Frame::Message {
796                         message,
797                         body: false,
798                     }))),
799                     Message::WithBody(message, body) => Box::new(
800                         stream::once(Ok(Frame::Message {
801                             message,
802                             body: true,
803                         }))
804                         .chain(Compat::new(body).map(|chunk| Frame::Body { chunk: Some(chunk) }))
805                         .chain(stream::once(Ok(Frame::Body { chunk: None }))),
806                     ),
807                 };
808                 Ok(f.from_err::<Error>())
809             })
810             .flatten()
811             .forward(sink)
812             .map(|_| ())
813     }
814 
815     /// Get dist status.
get_dist_status(&self) -> SFuture<DistInfo>816     fn get_dist_status(&self) -> SFuture<DistInfo> {
817         f_ok(self.dist_client.get_status())
818     }
819 
820     /// Get info and stats about the cache.
get_info(&self) -> SFuture<ServerInfo>821     fn get_info(&self) -> SFuture<ServerInfo> {
822         let stats = self.stats.borrow().clone();
823         let cache_location = self.storage.location();
824         Box::new(
825             self.storage
826                 .current_size()
827                 .join(self.storage.max_size())
828                 .map(move |(cache_size, max_cache_size)| ServerInfo {
829                     stats,
830                     cache_location,
831                     cache_size,
832                     max_cache_size,
833                 }),
834         )
835     }
836 
837     /// Zero stats about the cache.
zero_stats(&self)838     fn zero_stats(&self) {
839         *self.stats.borrow_mut() = ServerStats::default();
840     }
841 
842     /// Handle a compile request from a client.
843     ///
844     /// This will handle a compile request entirely, generating a response with
845     /// the inital information and an optional body which will eventually
846     /// contain the results of the compilation.
handle_compile(&self, compile: Compile) -> SFuture<SccacheResponse>847     fn handle_compile(&self, compile: Compile) -> SFuture<SccacheResponse> {
848         let exe = compile.exe;
849         let cmd = compile.args;
850         let cwd: PathBuf = compile.cwd.into();
851         let env_vars = compile.env_vars;
852         let me = self.clone();
853 
854         Box::new(
855             self.compiler_info(exe.into(), cwd.clone(), &env_vars)
856                 .map(move |info| me.check_compiler(info, cmd, cwd, env_vars)),
857         )
858     }
859 
860     /// Look up compiler info from the cache for the compiler `path`.
861     /// If not cached, determine the compiler type and cache the result.
compiler_info( &self, path: PathBuf, cwd: PathBuf, env: &[(OsString, OsString)], ) -> SFuture<Result<Box<dyn Compiler<C>>>>862     fn compiler_info(
863         &self,
864         path: PathBuf,
865         cwd: PathBuf,
866         env: &[(OsString, OsString)],
867     ) -> SFuture<Result<Box<dyn Compiler<C>>>> {
868         trace!("compiler_info");
869 
870         let me = self.clone();
871         let me1 = self.clone();
872 
873         // lookup if compiler proxy exists for the current compiler path
874 
875         let path2 = path.clone();
876         let path1 = path.clone();
877         let env = env.to_vec();
878 
879         let resolve_w_proxy = {
880             let compiler_proxies_borrow = self.compiler_proxies.borrow();
881 
882             if let Some((compiler_proxy, _filetime)) = compiler_proxies_borrow.get(&path) {
883                 let fut = compiler_proxy.resolve_proxied_executable(
884                     self.creator.clone(),
885                     cwd.clone(),
886                     env.as_slice(),
887                 );
888                 Box::new(fut.then(|res: Result<_>| Ok(res.ok())))
889             } else {
890                 f_ok(None)
891             }
892         };
893 
894         // use the supplied compiler path as fallback, lookup its modification time too
895         let w_fallback = resolve_w_proxy.then(move |res: Result<Option<(PathBuf, FileTime)>>| {
896             let opt = match res {
897                 Ok(Some(x)) => Some(x), // TODO resolve the path right away
898                 _ => {
899                     // fallback to using the path directly
900                     metadata(&path2)
901                         .map(|attr| FileTime::from_last_modification_time(&attr))
902                         .ok()
903                         .map(move |filetime| (path2, filetime))
904                 }
905             };
906             f_ok(opt)
907         });
908 
909         let lookup_compiler = w_fallback.and_then(move |opt: Option<(PathBuf, FileTime)>| {
910             let (resolved_compiler_path, mtime) =
911                 opt.expect("Must contain sane data, otherwise mtime is not avail");
912 
913             let dist_info = match me1.dist_client.get_client() {
914                 Ok(Some(ref client)) => {
915                     if let Some(archive) = client.get_custom_toolchain(&resolved_compiler_path) {
916                         match metadata(&archive)
917                             .map(|attr| FileTime::from_last_modification_time(&attr))
918                         {
919                             Ok(mtime) => Some((archive, mtime)),
920                             _ => None,
921                         }
922                     } else {
923                         None
924                     }
925                 }
926                 _ => None,
927             };
928 
929             let opt = match me1.compilers.borrow().get(&resolved_compiler_path) {
930                 // It's a hit only if the mtime and dist archive data matches.
931                 Some(&Some(ref entry)) => {
932                     if entry.mtime == mtime && entry.dist_info == dist_info {
933                         Some(entry.compiler.clone())
934                     } else {
935                         None
936                     }
937                 }
938                 _ => None,
939             };
940             f_ok((resolved_compiler_path, mtime, opt, dist_info))
941         });
942 
943         let obtain = lookup_compiler.and_then(
944             move |(resolved_compiler_path, mtime, opt, dist_info): (
945                 PathBuf,
946                 FileTime,
947                 Option<Box<dyn Compiler<C>>>,
948                 Option<(PathBuf, FileTime)>,
949             )| {
950                 match opt {
951                     Some(info) => {
952                         trace!("compiler_info cache hit");
953                         f_ok(Ok(info))
954                     }
955                     None => {
956                         trace!("compiler_info cache miss");
957                         // Check the compiler type and return the result when
958                         // finished. This generally involves invoking the compiler,
959                         // so do it asynchronously.
960 
961                         // the compiler path might be compiler proxy, so it is important to use
962                         // `path` (or its clone `path1`) to resolve using that one, not using `resolved_compiler_path`
963                         let x = get_compiler_info::<C>(
964                             me.creator.clone(),
965                             &path1,
966                             &cwd,
967                             env.as_slice(),
968                             &me.pool,
969                             dist_info.clone().map(|(p, _)| p),
970                         );
971 
972                         Box::new(x.then(
973                             move |info: Result<(
974                                 Box<dyn Compiler<C>>,
975                                 Option<Box<dyn CompilerProxy<C>>>,
976                             )>| {
977                                 match info {
978                                     Ok((ref c, ref proxy)) => {
979                                         // register the proxy for this compiler, so it will be used directly from now on
980                                         // and the true/resolved compiler will create table hits in the hash map
981                                         // based on the resolved path
982                                         if let Some(proxy) = proxy {
983                                             trace!(
984                                                 "Inserting new path proxy {:?} @ {:?} -> {:?}",
985                                                 &path,
986                                                 &cwd,
987                                                 resolved_compiler_path
988                                             );
989                                             let proxy: Box<dyn CompilerProxy<C>> =
990                                                 proxy.box_clone();
991                                             me.compiler_proxies
992                                                 .borrow_mut()
993                                                 .insert(path, (proxy, mtime));
994                                         }
995                                         // TODO add some safety checks in case a proxy exists, that the initial `path` is not
996                                         // TODO the same as the resolved compiler binary
997 
998                                         // cache
999                                         let map_info =
1000                                             CompilerCacheEntry::new(c.clone(), mtime, dist_info);
1001                                         trace!(
1002                                             "Inserting POSSIBLY PROXIED cache map info for {:?}",
1003                                             &resolved_compiler_path
1004                                         );
1005                                         me.compilers
1006                                             .borrow_mut()
1007                                             .insert(resolved_compiler_path, Some(map_info));
1008                                     }
1009                                     Err(_) => {
1010                                         trace!("Inserting PLAIN cache map info for {:?}", &path);
1011                                         me.compilers.borrow_mut().insert(path, None);
1012                                     }
1013                                 }
1014                                 // drop the proxy information, response is compiler only
1015                                 let r: Result<Box<dyn Compiler<C>>> = info.map(|info| info.0);
1016                                 f_ok(r)
1017                             },
1018                         ))
1019                     }
1020                 }
1021             },
1022         );
1023 
1024         Box::new(obtain)
1025     }
1026 
1027     /// Check that we can handle and cache `cmd` when run with `compiler`.
1028     /// If so, run `start_compile_task` to execute it.
check_compiler( &self, compiler: Result<Box<dyn Compiler<C>>>, cmd: Vec<OsString>, cwd: PathBuf, env_vars: Vec<(OsString, OsString)>, ) -> SccacheResponse1029     fn check_compiler(
1030         &self,
1031         compiler: Result<Box<dyn Compiler<C>>>,
1032         cmd: Vec<OsString>,
1033         cwd: PathBuf,
1034         env_vars: Vec<(OsString, OsString)>,
1035     ) -> SccacheResponse {
1036         let mut stats = self.stats.borrow_mut();
1037         match compiler {
1038             Err(e) => {
1039                 debug!("check_compiler: Unsupported compiler: {}", e.to_string());
1040                 stats.requests_unsupported_compiler += 1;
1041                 return Message::WithoutBody(Response::Compile(
1042                     CompileResponse::UnsupportedCompiler(OsString::from(e.to_string())),
1043                 ));
1044             }
1045             Ok(c) => {
1046                 debug!("check_compiler: Supported compiler");
1047                 // Now check that we can handle this compiler with
1048                 // the provided commandline.
1049                 match c.parse_arguments(&cmd, &cwd) {
1050                     CompilerArguments::Ok(hasher) => {
1051                         debug!("parse_arguments: Ok: {:?}", cmd);
1052                         stats.requests_executed += 1;
1053                         let (tx, rx) = Body::pair();
1054                         self.start_compile_task(c, hasher, cmd, cwd, env_vars, tx);
1055                         let res = CompileResponse::CompileStarted;
1056                         return Message::WithBody(Response::Compile(res), rx);
1057                     }
1058                     CompilerArguments::CannotCache(why, extra_info) => {
1059                         if let Some(extra_info) = extra_info {
1060                             debug!(
1061                                 "parse_arguments: CannotCache({}, {}): {:?}",
1062                                 why, extra_info, cmd
1063                             )
1064                         } else {
1065                             debug!("parse_arguments: CannotCache({}): {:?}", why, cmd)
1066                         }
1067                         stats.requests_not_cacheable += 1;
1068                         *stats.not_cached.entry(why.to_string()).or_insert(0) += 1;
1069                     }
1070                     CompilerArguments::NotCompilation => {
1071                         debug!("parse_arguments: NotCompilation: {:?}", cmd);
1072                         stats.requests_not_compile += 1;
1073                     }
1074                 }
1075             }
1076         }
1077 
1078         let res = CompileResponse::UnhandledCompile;
1079         Message::WithoutBody(Response::Compile(res))
1080     }
1081 
1082     /// Given compiler arguments `arguments`, look up
1083     /// a compile result in the cache or execute the compilation and store
1084     /// the result in the cache.
start_compile_task( &self, compiler: Box<dyn Compiler<C>>, hasher: Box<dyn CompilerHasher<C>>, arguments: Vec<OsString>, cwd: PathBuf, env_vars: Vec<(OsString, OsString)>, tx: mpsc::Sender<Result<Response>>, )1085     fn start_compile_task(
1086         &self,
1087         compiler: Box<dyn Compiler<C>>,
1088         hasher: Box<dyn CompilerHasher<C>>,
1089         arguments: Vec<OsString>,
1090         cwd: PathBuf,
1091         env_vars: Vec<(OsString, OsString)>,
1092         tx: mpsc::Sender<Result<Response>>,
1093     ) {
1094         let force_recache = env_vars
1095             .iter()
1096             .any(|&(ref k, ref _v)| k.as_os_str() == OsStr::new("SCCACHE_RECACHE"));
1097         let cache_control = if force_recache {
1098             CacheControl::ForceRecache
1099         } else {
1100             CacheControl::Default
1101         };
1102         let out_pretty = hasher.output_pretty().into_owned();
1103         let color_mode = hasher.color_mode();
1104         let result = hasher.get_cached_or_compile(
1105             self.dist_client.get_client(),
1106             self.creator.clone(),
1107             self.storage.clone(),
1108             arguments,
1109             cwd,
1110             env_vars,
1111             cache_control,
1112             self.pool.clone(),
1113         );
1114         let me = self.clone();
1115         let kind = compiler.kind();
1116         let task = result.then(move |result| {
1117             let mut cache_write = None;
1118             let mut stats = me.stats.borrow_mut();
1119             let mut res = CompileFinished {
1120                 color_mode,
1121                 ..Default::default()
1122             };
1123             match result {
1124                 Ok((compiled, out)) => {
1125                     match compiled {
1126                         CompileResult::Error => {
1127                             stats.cache_errors.increment(&kind);
1128                         }
1129                         CompileResult::CacheHit(duration) => {
1130                             stats.cache_hits.increment(&kind);
1131                             stats.cache_read_hit_duration += duration;
1132                         }
1133                         CompileResult::CacheMiss(miss_type, dist_type, duration, future) => {
1134                             match dist_type {
1135                                 DistType::NoDist => {}
1136                                 DistType::Ok(id) => {
1137                                     let server = id.addr().to_string();
1138                                     let server_count =
1139                                         stats.dist_compiles.entry(server).or_insert(0);
1140                                     *server_count += 1;
1141                                 }
1142                                 DistType::Error => stats.dist_errors += 1,
1143                             }
1144                             match miss_type {
1145                                 MissType::Normal => {}
1146                                 MissType::ForcedRecache => {
1147                                     stats.forced_recaches += 1;
1148                                 }
1149                                 MissType::TimedOut => {
1150                                     stats.cache_timeouts += 1;
1151                                 }
1152                                 MissType::CacheReadError => {
1153                                     stats.cache_errors.increment(&kind);
1154                                 }
1155                             }
1156                             stats.cache_misses.increment(&kind);
1157                             stats.cache_read_miss_duration += duration;
1158                             cache_write = Some(future);
1159                         }
1160                         CompileResult::NotCacheable => {
1161                             stats.cache_misses.increment(&kind);
1162                             stats.non_cacheable_compilations += 1;
1163                         }
1164                         CompileResult::CompileFailed => {
1165                             stats.compile_fails += 1;
1166                         }
1167                     };
1168                     let Output {
1169                         status,
1170                         stdout,
1171                         stderr,
1172                     } = out;
1173                     trace!("CompileFinished retcode: {}", status);
1174                     match status.code() {
1175                         Some(code) => res.retcode = Some(code),
1176                         None => res.signal = Some(get_signal(status)),
1177                     };
1178                     res.stdout = stdout;
1179                     res.stderr = stderr;
1180                 }
1181                 Err(err) => {
1182                     match err.downcast::<ProcessError>() {
1183                         Ok(ProcessError(output)) => {
1184                             debug!("Compilation failed: {:?}", output);
1185                             stats.compile_fails += 1;
1186                             match output.status.code() {
1187                                 Some(code) => res.retcode = Some(code),
1188                                 None => res.signal = Some(get_signal(output.status)),
1189                             };
1190                             res.stdout = output.stdout;
1191                             res.stderr = output.stderr;
1192                         }
1193                         Err(err) => match err.downcast::<HttpClientError>() {
1194                             Ok(HttpClientError(msg)) => {
1195                                 me.dist_client.reset_state();
1196                                 let errmsg =
1197                                     format!("[{:?}] http error status: {}", out_pretty, msg);
1198                                 error!("{}", errmsg);
1199                                 res.retcode = Some(1);
1200                                 res.stderr = errmsg.as_bytes().to_vec();
1201                             }
1202                             Err(err) => {
1203                                 use std::fmt::Write;
1204 
1205                                 error!("[{:?}] fatal error: {}", out_pretty, err);
1206 
1207                                 let mut error = "sccache: encountered fatal error\n".to_string();
1208                                 let _ = writeln!(error, "sccache: error: {}", err);
1209                                 for e in err.chain() {
1210                                     error!("[{:?}] \t{}", out_pretty, e);
1211                                     let _ = writeln!(error, "sccache: caused by: {}", e);
1212                                 }
1213                                 stats.cache_errors.increment(&kind);
1214                                 //TODO: figure out a better way to communicate this?
1215                                 res.retcode = Some(-2);
1216                                 res.stderr = error.into_bytes();
1217                             }
1218                         },
1219                     }
1220                 }
1221             };
1222             let send = tx.send(Ok(Response::CompileFinished(res)));
1223 
1224             let me = me.clone();
1225             let cache_write = cache_write.then(move |result| {
1226                 match result {
1227                     Err(e) => {
1228                         debug!("Error executing cache write: {}", e);
1229                         me.stats.borrow_mut().cache_write_errors += 1;
1230                     }
1231                     //TODO: save cache stats!
1232                     Ok(Some(info)) => {
1233                         debug!(
1234                             "[{}]: Cache write finished in {}",
1235                             info.object_file_pretty,
1236                             util::fmt_duration_as_secs(&info.duration)
1237                         );
1238                         me.stats.borrow_mut().cache_writes += 1;
1239                         me.stats.borrow_mut().cache_write_duration += info.duration;
1240                     }
1241 
1242                     Ok(None) => {}
1243                 }
1244                 Ok(())
1245             });
1246 
1247             send.join(cache_write).then(|_| Ok(()))
1248         });
1249 
1250         tokio_compat::runtime::current_thread::TaskExecutor::current()
1251             .spawn_local(Box::new(task))
1252             .unwrap();
1253     }
1254 }
1255 
1256 #[derive(Serialize, Deserialize, Debug, Clone, Default)]
1257 pub struct PerLanguageCount {
1258     counts: HashMap<String, u64>,
1259 }
1260 
1261 impl PerLanguageCount {
increment(&mut self, kind: &CompilerKind)1262     fn increment(&mut self, kind: &CompilerKind) {
1263         let key = kind.lang_kind();
1264         let count = self.counts.entry(key).or_insert(0);
1265         *count += 1;
1266     }
1267 
all(&self) -> u641268     pub fn all(&self) -> u64 {
1269         self.counts.values().sum()
1270     }
1271 
get(&self, key: &str) -> Option<&u64>1272     pub fn get(&self, key: &str) -> Option<&u64> {
1273         self.counts.get(key)
1274     }
1275 
new() -> PerLanguageCount1276     pub fn new() -> PerLanguageCount {
1277         Self::default()
1278     }
1279 }
1280 
1281 /// Statistics about the server.
1282 #[derive(Serialize, Deserialize, Clone, Debug)]
1283 pub struct ServerStats {
1284     /// The count of client compile requests.
1285     pub compile_requests: u64,
1286     /// The count of client requests that used an unsupported compiler.
1287     pub requests_unsupported_compiler: u64,
1288     /// The count of client requests that were not compilation.
1289     pub requests_not_compile: u64,
1290     /// The count of client requests that were not cacheable.
1291     pub requests_not_cacheable: u64,
1292     /// The count of client requests that were executed.
1293     pub requests_executed: u64,
1294     /// The count of errors handling compile requests (per language).
1295     pub cache_errors: PerLanguageCount,
1296     /// The count of cache hits for handled compile requests (per language).
1297     pub cache_hits: PerLanguageCount,
1298     /// The count of cache misses for handled compile requests (per language).
1299     pub cache_misses: PerLanguageCount,
1300     /// The count of cache misses because the cache took too long to respond.
1301     pub cache_timeouts: u64,
1302     /// The count of errors reading cache entries.
1303     pub cache_read_errors: u64,
1304     /// The count of compilations which were successful but couldn't be cached.
1305     pub non_cacheable_compilations: u64,
1306     /// The count of compilations which forcibly ignored the cache.
1307     pub forced_recaches: u64,
1308     /// The count of errors writing to cache.
1309     pub cache_write_errors: u64,
1310     /// The number of successful cache writes.
1311     pub cache_writes: u64,
1312     /// The total time spent writing cache entries.
1313     pub cache_write_duration: Duration,
1314     /// The total time spent reading cache hits.
1315     pub cache_read_hit_duration: Duration,
1316     /// The total time spent reading cache misses.
1317     pub cache_read_miss_duration: Duration,
1318     /// The count of compilation failures.
1319     pub compile_fails: u64,
1320     /// Counts of reasons why compiles were not cached.
1321     pub not_cached: HashMap<String, usize>,
1322     /// The count of compilations that were successfully distributed indexed
1323     /// by the server that ran those compilations.
1324     pub dist_compiles: HashMap<String, usize>,
1325     /// The count of compilations that were distributed but failed and had to be re-run locally
1326     pub dist_errors: u64,
1327 }
1328 
1329 /// Info and stats about the server.
1330 #[derive(Serialize, Deserialize, Clone, Debug)]
1331 pub struct ServerInfo {
1332     pub stats: ServerStats,
1333     pub cache_location: String,
1334     pub cache_size: Option<u64>,
1335     pub max_cache_size: Option<u64>,
1336 }
1337 
1338 /// Status of the dist client.
1339 #[derive(Serialize, Deserialize, Clone, Debug)]
1340 pub enum DistInfo {
1341     Disabled(String),
1342     #[cfg(feature = "dist-client")]
1343     NotConnected(Option<config::HTTPUrl>, String),
1344     #[cfg(feature = "dist-client")]
1345     SchedulerStatus(Option<config::HTTPUrl>, dist::SchedulerStatusResult),
1346 }
1347 
1348 impl Default for ServerStats {
default() -> ServerStats1349     fn default() -> ServerStats {
1350         ServerStats {
1351             compile_requests: u64::default(),
1352             requests_unsupported_compiler: u64::default(),
1353             requests_not_compile: u64::default(),
1354             requests_not_cacheable: u64::default(),
1355             requests_executed: u64::default(),
1356             cache_errors: PerLanguageCount::new(),
1357             cache_hits: PerLanguageCount::new(),
1358             cache_misses: PerLanguageCount::new(),
1359             cache_timeouts: u64::default(),
1360             cache_read_errors: u64::default(),
1361             non_cacheable_compilations: u64::default(),
1362             forced_recaches: u64::default(),
1363             cache_write_errors: u64::default(),
1364             cache_writes: u64::default(),
1365             cache_write_duration: Duration::new(0, 0),
1366             cache_read_hit_duration: Duration::new(0, 0),
1367             cache_read_miss_duration: Duration::new(0, 0),
1368             compile_fails: u64::default(),
1369             not_cached: HashMap::new(),
1370             dist_compiles: HashMap::new(),
1371             dist_errors: u64::default(),
1372         }
1373     }
1374 }
1375 
1376 impl ServerStats {
1377     /// Print stats to stdout in a human-readable format.
1378     ///
1379     /// Return the formatted width of each of the (name, value) columns.
print(&self) -> (usize, usize)1380     fn print(&self) -> (usize, usize) {
1381         macro_rules! set_stat {
1382             ($vec:ident, $var:expr, $name:expr) => {{
1383                 // name, value, suffix length
1384                 $vec.push(($name.to_string(), $var.to_string(), 0));
1385             }};
1386         }
1387 
1388         macro_rules! set_lang_stat {
1389             ($vec:ident, $var:expr, $name:expr) => {{
1390                 $vec.push(($name.to_string(), $var.all().to_string(), 0));
1391                 let mut sorted_stats: Vec<_> = $var.counts.iter().collect();
1392                 sorted_stats.sort_by_key(|v| v.0);
1393                 for (lang, count) in sorted_stats.iter() {
1394                     $vec.push((format!("{} ({})", $name, lang), count.to_string(), 0));
1395                 }
1396             }};
1397         }
1398 
1399         macro_rules! set_duration_stat {
1400             ($vec:ident, $dur:expr, $num:expr, $name:expr) => {{
1401                 let s = if $num > 0 {
1402                     $dur / $num as u32
1403                 } else {
1404                     Default::default()
1405                 };
1406                 // name, value, suffix length
1407                 $vec.push(($name.to_string(), util::fmt_duration_as_secs(&s), 2));
1408             }};
1409         }
1410 
1411         let mut stats_vec = vec![];
1412         //TODO: this would be nice to replace with a custom derive implementation.
1413         set_stat!(stats_vec, self.compile_requests, "Compile requests");
1414         set_stat!(
1415             stats_vec,
1416             self.requests_executed,
1417             "Compile requests executed"
1418         );
1419         set_lang_stat!(stats_vec, self.cache_hits, "Cache hits");
1420         set_lang_stat!(stats_vec, self.cache_misses, "Cache misses");
1421         set_stat!(stats_vec, self.cache_timeouts, "Cache timeouts");
1422         set_stat!(stats_vec, self.cache_read_errors, "Cache read errors");
1423         set_stat!(stats_vec, self.forced_recaches, "Forced recaches");
1424         set_stat!(stats_vec, self.cache_write_errors, "Cache write errors");
1425         set_stat!(stats_vec, self.compile_fails, "Compilation failures");
1426         set_lang_stat!(stats_vec, self.cache_errors, "Cache errors");
1427         set_stat!(
1428             stats_vec,
1429             self.non_cacheable_compilations,
1430             "Non-cacheable compilations"
1431         );
1432         set_stat!(
1433             stats_vec,
1434             self.requests_not_cacheable,
1435             "Non-cacheable calls"
1436         );
1437         set_stat!(
1438             stats_vec,
1439             self.requests_not_compile,
1440             "Non-compilation calls"
1441         );
1442         set_stat!(
1443             stats_vec,
1444             self.requests_unsupported_compiler,
1445             "Unsupported compiler calls"
1446         );
1447         set_duration_stat!(
1448             stats_vec,
1449             self.cache_write_duration,
1450             self.cache_writes,
1451             "Average cache write"
1452         );
1453         set_duration_stat!(
1454             stats_vec,
1455             self.cache_read_miss_duration,
1456             self.cache_misses.all(),
1457             "Average cache read miss"
1458         );
1459         set_duration_stat!(
1460             stats_vec,
1461             self.cache_read_hit_duration,
1462             self.cache_hits.all(),
1463             "Average cache read hit"
1464         );
1465         set_stat!(
1466             stats_vec,
1467             self.dist_errors,
1468             "Failed distributed compilations"
1469         );
1470         let name_width = stats_vec
1471             .iter()
1472             .map(|&(ref n, _, _)| n.len())
1473             .max()
1474             .unwrap();
1475         let stat_width = stats_vec
1476             .iter()
1477             .map(|&(_, ref s, _)| s.len())
1478             .max()
1479             .unwrap();
1480         for (name, stat, suffix_len) in stats_vec {
1481             println!(
1482                 "{:<name_width$} {:>stat_width$}",
1483                 name,
1484                 stat,
1485                 name_width = name_width,
1486                 stat_width = stat_width + suffix_len
1487             );
1488         }
1489         if !self.dist_compiles.is_empty() {
1490             println!("\nSuccessful distributed compiles");
1491             let mut counts: Vec<_> = self.dist_compiles.iter().collect();
1492             counts.sort_by(|(_, c1), (_, c2)| c1.cmp(c2).reverse());
1493             for (reason, count) in counts {
1494                 println!(
1495                     "  {:<name_width$} {:>stat_width$}",
1496                     reason,
1497                     count,
1498                     name_width = name_width - 2,
1499                     stat_width = stat_width
1500                 );
1501             }
1502         }
1503         if !self.not_cached.is_empty() {
1504             println!("\nNon-cacheable reasons:");
1505             let mut counts: Vec<_> = self.not_cached.iter().collect();
1506             counts.sort_by(|(_, c1), (_, c2)| c1.cmp(c2).reverse());
1507             for (reason, count) in counts {
1508                 println!(
1509                     "{:<name_width$} {:>stat_width$}",
1510                     reason,
1511                     count,
1512                     name_width = name_width,
1513                     stat_width = stat_width
1514                 );
1515             }
1516             println!();
1517         }
1518         (name_width, stat_width)
1519     }
1520 }
1521 
1522 impl ServerInfo {
1523     /// Print info to stdout in a human-readable format.
print(&self)1524     pub fn print(&self) {
1525         let (name_width, stat_width) = self.stats.print();
1526         println!(
1527             "{:<name_width$} {}",
1528             "Cache location",
1529             self.cache_location,
1530             name_width = name_width
1531         );
1532         for &(name, val) in &[
1533             ("Cache size", &self.cache_size),
1534             ("Max cache size", &self.max_cache_size),
1535         ] {
1536             if let Some(val) = *val {
1537                 let (val, suffix) = match NumberPrefix::binary(val as f64) {
1538                     NumberPrefix::Standalone(bytes) => (bytes.to_string(), "bytes".to_string()),
1539                     NumberPrefix::Prefixed(prefix, n) => {
1540                         (format!("{:.0}", n), format!("{}B", prefix))
1541                     }
1542                 };
1543                 println!(
1544                     "{:<name_width$} {:>stat_width$} {}",
1545                     name,
1546                     val,
1547                     suffix,
1548                     name_width = name_width,
1549                     stat_width = stat_width
1550                 );
1551             }
1552         }
1553     }
1554 }
1555 
1556 enum Frame<R, R1> {
1557     Body { chunk: Option<R1> },
1558     Message { message: R, body: bool },
1559 }
1560 
1561 struct Body<R> {
1562     receiver: mpsc::Receiver<Result<R>>,
1563 }
1564 
1565 impl<R> Body<R> {
pair() -> (mpsc::Sender<Result<R>>, Self)1566     fn pair() -> (mpsc::Sender<Result<R>>, Self) {
1567         let (tx, rx) = mpsc::channel(0);
1568         (tx, Body { receiver: rx })
1569     }
1570 }
1571 
1572 impl<R> futures_03::Stream for Body<R> {
1573     type Item = Result<R>;
poll_next( mut self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> std::task::Poll<Option<Self::Item>>1574     fn poll_next(
1575         mut self: Pin<&mut Self>,
1576         _cx: &mut Context<'_>,
1577     ) -> std::task::Poll<Option<Self::Item>> {
1578         match Pin::new(&mut self.receiver).poll().unwrap() {
1579             Async::Ready(item) => std::task::Poll::Ready(item),
1580             Async::NotReady => std::task::Poll::Pending,
1581         }
1582     }
1583 }
1584 
1585 enum Message<R, B> {
1586     WithBody(R, B),
1587     WithoutBody(R),
1588 }
1589 
1590 impl<R, B> Message<R, B> {
into_inner(self) -> R1591     fn into_inner(self) -> R {
1592         match self {
1593             Message::WithBody(r, _) => r,
1594             Message::WithoutBody(r) => r,
1595         }
1596     }
1597 }
1598 
1599 /// Implementation of `Stream + Sink` that tokio-proto is expecting
1600 ///
1601 /// This type is composed of a few layers:
1602 ///
1603 /// * First there's `I`, the I/O object implementing `AsyncRead` and
1604 ///   `AsyncWrite`
1605 /// * Next that's framed using the `length_delimited` module in tokio-io giving
1606 ///   us a `Sink` and `Stream` of `BytesMut`.
1607 /// * Next that sink/stream is wrapped in `ReadBincode` which will cause the
1608 ///   `Stream` implementation to switch from `BytesMut` to `Request` by parsing
1609 ///   the bytes  bincode.
1610 /// * Finally that sink/stream is wrapped in `WriteBincode` which will cause the
1611 ///   `Sink` implementation to switch from `BytesMut` to `Response` meaning that
1612 ///   all `Response` types pushed in will be converted to `BytesMut` and pushed
1613 ///   below.
1614 struct SccacheTransport<I: AsyncRead + AsyncWrite> {
1615     inner: WriteBincode<ReadBincode<Framed<I>, Request>, Response>,
1616 }
1617 
1618 impl<I: AsyncRead + AsyncWrite> Stream for SccacheTransport<I> {
1619     type Item = Message<Request, Body<()>>;
1620     type Error = io::Error;
1621 
poll(&mut self) -> Poll<Option<Self::Item>, io::Error>1622     fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
1623         let msg = try_ready!(self.inner.poll().map_err(|e| {
1624             error!("SccacheTransport::poll failed: {}", e);
1625             io::Error::new(io::ErrorKind::Other, e)
1626         }));
1627         Ok(msg.map(Message::WithoutBody).into())
1628     }
1629 }
1630 
1631 impl<I: AsyncRead + AsyncWrite> Sink for SccacheTransport<I> {
1632     type SinkItem = Frame<Response, Response>;
1633     type SinkError = io::Error;
1634 
start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, io::Error>1635     fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, io::Error> {
1636         match item {
1637             Frame::Message { message, body } => match self.inner.start_send(message)? {
1638                 AsyncSink::Ready => Ok(AsyncSink::Ready),
1639                 AsyncSink::NotReady(message) => {
1640                     Ok(AsyncSink::NotReady(Frame::Message { message, body }))
1641                 }
1642             },
1643             Frame::Body { chunk: Some(chunk) } => match self.inner.start_send(chunk)? {
1644                 AsyncSink::Ready => Ok(AsyncSink::Ready),
1645                 AsyncSink::NotReady(chunk) => {
1646                     Ok(AsyncSink::NotReady(Frame::Body { chunk: Some(chunk) }))
1647                 }
1648             },
1649             Frame::Body { chunk: None } => Ok(AsyncSink::Ready),
1650         }
1651     }
1652 
poll_complete(&mut self) -> Poll<(), io::Error>1653     fn poll_complete(&mut self) -> Poll<(), io::Error> {
1654         self.inner.poll_complete()
1655     }
1656 
close(&mut self) -> Poll<(), io::Error>1657     fn close(&mut self) -> Poll<(), io::Error> {
1658         self.inner.close()
1659     }
1660 }
1661 
1662 struct ShutdownOrInactive {
1663     rx: mpsc::Receiver<ServerMessage>,
1664     timeout: Option<Delay>,
1665     timeout_dur: Duration,
1666 }
1667 
1668 impl Future for ShutdownOrInactive {
1669     type Item = ();
1670     type Error = io::Error;
1671 
poll(&mut self) -> Poll<(), io::Error>1672     fn poll(&mut self) -> Poll<(), io::Error> {
1673         loop {
1674             match self.rx.poll().unwrap() {
1675                 Async::NotReady => break,
1676                 // Shutdown received!
1677                 Async::Ready(Some(ServerMessage::Shutdown)) => return Ok(().into()),
1678                 Async::Ready(Some(ServerMessage::Request)) => {
1679                     if self.timeout_dur != Duration::new(0, 0) {
1680                         self.timeout = Some(Delay::new(Instant::now() + self.timeout_dur));
1681                     }
1682                 }
1683                 // All services have shut down, in theory this isn't possible...
1684                 Async::Ready(None) => return Ok(().into()),
1685             }
1686         }
1687         match self.timeout {
1688             None => Ok(Async::NotReady),
1689             Some(ref mut timeout) => timeout
1690                 .poll()
1691                 .map_err(|err| io::Error::new(io::ErrorKind::Other, err)),
1692         }
1693     }
1694 }
1695 
1696 /// Helper future which tracks the `ActiveInfo` below. This future will resolve
1697 /// once all instances of `ActiveInfo` have been dropped.
1698 struct WaitUntilZero {
1699     info: Rc<RefCell<Info>>,
1700 }
1701 
1702 struct ActiveInfo {
1703     info: Rc<RefCell<Info>>,
1704 }
1705 
1706 struct Info {
1707     active: usize,
1708     waker: Option<Waker>,
1709 }
1710 
1711 impl WaitUntilZero {
new() -> (WaitUntilZero, ActiveInfo)1712     fn new() -> (WaitUntilZero, ActiveInfo) {
1713         let info = Rc::new(RefCell::new(Info {
1714             active: 1,
1715             waker: None,
1716         }));
1717 
1718         (WaitUntilZero { info: info.clone() }, ActiveInfo { info })
1719     }
1720 }
1721 
1722 impl Clone for ActiveInfo {
clone(&self) -> ActiveInfo1723     fn clone(&self) -> ActiveInfo {
1724         self.info.borrow_mut().active += 1;
1725         ActiveInfo {
1726             info: self.info.clone(),
1727         }
1728     }
1729 }
1730 
1731 impl Drop for ActiveInfo {
drop(&mut self)1732     fn drop(&mut self) {
1733         let mut info = self.info.borrow_mut();
1734         info.active -= 1;
1735         if info.active == 0 {
1736             if let Some(waker) = info.waker.take() {
1737                 waker.wake();
1738             }
1739         }
1740     }
1741 }
1742 
1743 impl std::future::Future for WaitUntilZero {
1744     type Output = io::Result<()>;
1745 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output>1746     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
1747         let mut info = self.info.borrow_mut();
1748         if info.active == 0 {
1749             std::task::Poll::Ready(Ok(()))
1750         } else {
1751             info.waker = Some(cx.waker().clone());
1752             std::task::Poll::Pending
1753         }
1754     }
1755 }
1756