1 // Copyright 2016 Mozilla Foundation
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 // For tokio_io::codec::length_delimited::Framed;
16 #![allow(deprecated)]
17
18 use crate::cache::{storage_from_config, Storage};
19 use crate::compiler::{
20 get_compiler_info, CacheControl, CompileResult, Compiler, CompilerArguments, CompilerHasher,
21 CompilerKind, CompilerProxy, DistType, MissType,
22 };
23 #[cfg(feature = "dist-client")]
24 use crate::config;
25 use crate::config::Config;
26 use crate::dist;
27 use crate::jobserver::Client;
28 use crate::mock_command::{CommandCreatorSync, ProcessCommandCreator};
29 use crate::protocol::{Compile, CompileFinished, CompileResponse, Request, Response};
30 use crate::util;
31 #[cfg(feature = "dist-client")]
32 use anyhow::Context as _;
33 use filetime::FileTime;
34 use futures::sync::mpsc;
35 use futures::{future, stream, Async, AsyncSink, Future, Poll, Sink, StartSend, Stream};
36 use futures_03::compat::Compat;
37 use futures_03::executor::ThreadPool;
38 use number_prefix::NumberPrefix;
39 use std::cell::RefCell;
40 use std::collections::HashMap;
41 use std::env;
42 use std::ffi::{OsStr, OsString};
43 use std::fs::metadata;
44 use std::io::{self, Write};
45 #[cfg(feature = "dist-client")]
46 use std::mem;
47 use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
48 use std::path::PathBuf;
49 use std::pin::Pin;
50 use std::process::{ExitStatus, Output};
51 use std::rc::Rc;
52 use std::sync::Arc;
53 #[cfg(feature = "dist-client")]
54 use std::sync::Mutex;
55 use std::task::{Context, Waker};
56 use std::time::Duration;
57 use std::time::Instant;
58 use std::u64;
59 use tokio_compat::runtime::current_thread::Runtime;
60 use tokio_io::codec::length_delimited;
61 use tokio_io::codec::length_delimited::Framed;
62 use tokio_io::{AsyncRead, AsyncWrite};
63 use tokio_serde_bincode::{ReadBincode, WriteBincode};
64 use tokio_tcp::TcpListener;
65 use tokio_timer::{Delay, Timeout};
66 use tower::Service;
67
68 use crate::errors::*;
69
70 /// If the server is idle for this many seconds, shut down.
71 const DEFAULT_IDLE_TIMEOUT: u64 = 600;
72
73 /// If the dist client couldn't be created, retry creation at this number
74 /// of seconds from now (or later)
75 #[cfg(feature = "dist-client")]
76 const DIST_CLIENT_RECREATE_TIMEOUT: Duration = Duration::from_secs(30);
77
78 /// Result of background server startup.
79 #[derive(Debug, Serialize, Deserialize)]
80 pub enum ServerStartup {
81 /// Server started successfully on `port`.
82 Ok { port: u16 },
83 /// Server Addr already in suse
84 AddrInUse,
85 /// Timed out waiting for server startup.
86 TimedOut,
87 /// Server encountered an error.
88 Err { reason: String },
89 }
90
91 /// Get the time the server should idle for before shutting down.
get_idle_timeout() -> u6492 fn get_idle_timeout() -> u64 {
93 // A value of 0 disables idle shutdown entirely.
94 env::var("SCCACHE_IDLE_TIMEOUT")
95 .ok()
96 .and_then(|s| s.parse().ok())
97 .unwrap_or(DEFAULT_IDLE_TIMEOUT)
98 }
99
notify_server_startup_internal<W: Write>(mut w: W, status: ServerStartup) -> Result<()>100 fn notify_server_startup_internal<W: Write>(mut w: W, status: ServerStartup) -> Result<()> {
101 util::write_length_prefixed_bincode(&mut w, status)
102 }
103
104 #[cfg(unix)]
notify_server_startup(name: &Option<OsString>, status: ServerStartup) -> Result<()>105 fn notify_server_startup(name: &Option<OsString>, status: ServerStartup) -> Result<()> {
106 use std::os::unix::net::UnixStream;
107 let name = match *name {
108 Some(ref s) => s,
109 None => return Ok(()),
110 };
111 debug!("notify_server_startup({:?})", status);
112 let stream = UnixStream::connect(name)?;
113 notify_server_startup_internal(stream, status)
114 }
115
116 #[cfg(windows)]
notify_server_startup(name: &Option<OsString>, status: ServerStartup) -> Result<()>117 fn notify_server_startup(name: &Option<OsString>, status: ServerStartup) -> Result<()> {
118 use std::fs::OpenOptions;
119
120 let name = match *name {
121 Some(ref s) => s,
122 None => return Ok(()),
123 };
124 let pipe = OpenOptions::new().write(true).read(true).open(name)?;
125 notify_server_startup_internal(pipe, status)
126 }
127
128 #[cfg(unix)]
get_signal(status: ExitStatus) -> i32129 fn get_signal(status: ExitStatus) -> i32 {
130 use std::os::unix::prelude::*;
131 status.signal().expect("must have signal")
132 }
133 #[cfg(windows)]
get_signal(_status: ExitStatus) -> i32134 fn get_signal(_status: ExitStatus) -> i32 {
135 panic!("no signals on windows")
136 }
137
138 pub struct DistClientContainer {
139 // The actual dist client state
140 #[cfg(feature = "dist-client")]
141 state: Mutex<DistClientState>,
142 }
143
144 #[cfg(feature = "dist-client")]
145 struct DistClientConfig {
146 // Reusable items tied to an SccacheServer instance
147 pool: ThreadPool,
148
149 // From the static dist configuration
150 scheduler_url: Option<config::HTTPUrl>,
151 auth: config::DistAuth,
152 cache_dir: PathBuf,
153 toolchain_cache_size: u64,
154 toolchains: Vec<config::DistToolchainConfig>,
155 rewrite_includes_only: bool,
156 }
157
158 #[cfg(feature = "dist-client")]
159 enum DistClientState {
160 #[cfg(feature = "dist-client")]
161 Some(Box<DistClientConfig>, Arc<dyn dist::Client>),
162 #[cfg(feature = "dist-client")]
163 FailWithMessage(Box<DistClientConfig>, String),
164 #[cfg(feature = "dist-client")]
165 RetryCreateAt(Box<DistClientConfig>, Instant),
166 Disabled,
167 }
168
169 #[cfg(not(feature = "dist-client"))]
170 impl DistClientContainer {
171 #[cfg(not(feature = "dist-client"))]
new(config: &Config, _: &ThreadPool) -> Self172 fn new(config: &Config, _: &ThreadPool) -> Self {
173 if config.dist.scheduler_url.is_some() {
174 warn!("Scheduler address configured but dist feature disabled, disabling distributed sccache")
175 }
176 Self {}
177 }
178
new_disabled() -> Self179 pub fn new_disabled() -> Self {
180 Self {}
181 }
182
reset_state(&self)183 pub fn reset_state(&self) {}
184
get_status(&self) -> DistInfo185 pub fn get_status(&self) -> DistInfo {
186 DistInfo::Disabled("dist-client feature not selected".to_string())
187 }
188
get_client(&self) -> Result<Option<Arc<dyn dist::Client>>>189 fn get_client(&self) -> Result<Option<Arc<dyn dist::Client>>> {
190 Ok(None)
191 }
192 }
193
194 #[cfg(feature = "dist-client")]
195 impl DistClientContainer {
new(config: &Config, pool: &ThreadPool) -> Self196 fn new(config: &Config, pool: &ThreadPool) -> Self {
197 let config = DistClientConfig {
198 pool: pool.clone(),
199 scheduler_url: config.dist.scheduler_url.clone(),
200 auth: config.dist.auth.clone(),
201 cache_dir: config.dist.cache_dir.clone(),
202 toolchain_cache_size: config.dist.toolchain_cache_size,
203 toolchains: config.dist.toolchains.clone(),
204 rewrite_includes_only: config.dist.rewrite_includes_only,
205 };
206 let state = Self::create_state(config);
207 Self {
208 state: Mutex::new(state),
209 }
210 }
211
new_disabled() -> Self212 pub fn new_disabled() -> Self {
213 Self {
214 state: Mutex::new(DistClientState::Disabled),
215 }
216 }
217
reset_state(&self)218 pub fn reset_state(&self) {
219 let mut guard = self.state.lock();
220 let state = guard.as_mut().unwrap();
221 let state: &mut DistClientState = &mut **state;
222 match mem::replace(state, DistClientState::Disabled) {
223 DistClientState::Some(cfg, _)
224 | DistClientState::FailWithMessage(cfg, _)
225 | DistClientState::RetryCreateAt(cfg, _) => {
226 warn!("State reset. Will recreate");
227 *state =
228 DistClientState::RetryCreateAt(cfg, Instant::now() - Duration::from_secs(1));
229 }
230 DistClientState::Disabled => (),
231 }
232 }
233
get_status(&self) -> DistInfo234 pub fn get_status(&self) -> DistInfo {
235 let mut guard = self.state.lock();
236 let state = guard.as_mut().unwrap();
237 let state: &mut DistClientState = &mut **state;
238 match state {
239 DistClientState::Disabled => DistInfo::Disabled("disabled".to_string()),
240 DistClientState::FailWithMessage(cfg, _) => DistInfo::NotConnected(
241 cfg.scheduler_url.clone(),
242 "enabled, auth not configured".to_string(),
243 ),
244 DistClientState::RetryCreateAt(cfg, _) => DistInfo::NotConnected(
245 cfg.scheduler_url.clone(),
246 "enabled, not connected, will retry".to_string(),
247 ),
248 DistClientState::Some(cfg, client) => match client.do_get_status().wait() {
249 Ok(res) => DistInfo::SchedulerStatus(cfg.scheduler_url.clone(), res),
250 Err(_) => DistInfo::NotConnected(
251 cfg.scheduler_url.clone(),
252 "could not communicate with scheduler".to_string(),
253 ),
254 },
255 }
256 }
257
get_client(&self) -> Result<Option<Arc<dyn dist::Client>>>258 fn get_client(&self) -> Result<Option<Arc<dyn dist::Client>>> {
259 let mut guard = self.state.lock();
260 let state = guard.as_mut().unwrap();
261 let state: &mut DistClientState = &mut **state;
262 Self::maybe_recreate_state(state);
263 let res = match state {
264 DistClientState::Some(_, dc) => Ok(Some(dc.clone())),
265 DistClientState::Disabled | DistClientState::RetryCreateAt(_, _) => Ok(None),
266 DistClientState::FailWithMessage(_, msg) => Err(anyhow!(msg.clone())),
267 };
268 if res.is_err() {
269 let config = match mem::replace(state, DistClientState::Disabled) {
270 DistClientState::FailWithMessage(config, _) => config,
271 _ => unreachable!(),
272 };
273 // The client is most likely mis-configured, make sure we
274 // re-create on our next attempt.
275 *state =
276 DistClientState::RetryCreateAt(config, Instant::now() - Duration::from_secs(1));
277 }
278 res
279 }
280
maybe_recreate_state(state: &mut DistClientState)281 fn maybe_recreate_state(state: &mut DistClientState) {
282 if let DistClientState::RetryCreateAt(_, instant) = *state {
283 if instant > Instant::now() {
284 return;
285 }
286 let config = match mem::replace(state, DistClientState::Disabled) {
287 DistClientState::RetryCreateAt(config, _) => config,
288 _ => unreachable!(),
289 };
290 info!("Attempting to recreate the dist client");
291 *state = Self::create_state(*config)
292 }
293 }
294
295 // Attempt to recreate the dist client
create_state(config: DistClientConfig) -> DistClientState296 fn create_state(config: DistClientConfig) -> DistClientState {
297 macro_rules! try_or_retry_later {
298 ($v:expr) => {{
299 match $v {
300 Ok(v) => v,
301 Err(e) => {
302 // `{:?}` prints the full cause chain and backtrace.
303 error!("{:?}", e);
304 return DistClientState::RetryCreateAt(
305 Box::new(config),
306 Instant::now() + DIST_CLIENT_RECREATE_TIMEOUT,
307 );
308 }
309 }
310 }};
311 }
312
313 macro_rules! try_or_fail_with_message {
314 ($v:expr) => {{
315 match $v {
316 Ok(v) => v,
317 Err(e) => {
318 // `{:?}` prints the full cause chain and backtrace.
319 let errmsg = format!("{:?}", e);
320 error!("{}", errmsg);
321 return DistClientState::FailWithMessage(
322 Box::new(config),
323 errmsg.to_string(),
324 );
325 }
326 }
327 }};
328 }
329 match config.scheduler_url {
330 Some(ref addr) => {
331 let url = addr.to_url();
332 info!("Enabling distributed sccache to {}", url);
333 let auth_token = match &config.auth {
334 config::DistAuth::Token { token } => Ok(token.to_owned()),
335 config::DistAuth::Oauth2CodeGrantPKCE { auth_url, .. }
336 | config::DistAuth::Oauth2Implicit { auth_url, .. } => {
337 Self::get_cached_config_auth_token(auth_url)
338 }
339 };
340 let auth_token = try_or_fail_with_message!(auth_token
341 .context("could not load client auth token, run |sccache --dist-auth|"));
342 let dist_client = dist::http::Client::new(
343 &config.pool,
344 url,
345 &config.cache_dir.join("client"),
346 config.toolchain_cache_size,
347 &config.toolchains,
348 auth_token,
349 config.rewrite_includes_only,
350 );
351 let dist_client =
352 try_or_retry_later!(dist_client.context("failure during dist client creation"));
353 use crate::dist::Client;
354 match dist_client.do_get_status().wait() {
355 Ok(res) => {
356 info!(
357 "Successfully created dist client with {:?} cores across {:?} servers",
358 res.num_cpus, res.num_servers
359 );
360 DistClientState::Some(Box::new(config), Arc::new(dist_client))
361 }
362 Err(_) => {
363 warn!("Scheduler address configured, but could not communicate with scheduler");
364 DistClientState::RetryCreateAt(
365 Box::new(config),
366 Instant::now() + DIST_CLIENT_RECREATE_TIMEOUT,
367 )
368 }
369 }
370 }
371 None => {
372 info!("No scheduler address configured, disabling distributed sccache");
373 DistClientState::Disabled
374 }
375 }
376 }
377
get_cached_config_auth_token(auth_url: &str) -> Result<String>378 fn get_cached_config_auth_token(auth_url: &str) -> Result<String> {
379 let cached_config = config::CachedConfig::reload()?;
380 cached_config
381 .with(|c| c.dist.auth_tokens.get(auth_url).map(String::to_owned))
382 .with_context(|| format!("token for url {} not present in cached config", auth_url))
383 }
384 }
385
386 /// Start an sccache server, listening on `port`.
387 ///
388 /// Spins an event loop handling client connections until a client
389 /// requests a shutdown.
start_server(config: &Config, port: u16) -> Result<()>390 pub fn start_server(config: &Config, port: u16) -> Result<()> {
391 info!("start_server: port: {}", port);
392 let client = unsafe { Client::new() };
393 let runtime = Runtime::new()?;
394 let pool = ThreadPool::builder()
395 .pool_size(std::cmp::max(20, 2 * num_cpus::get()))
396 .create()?;
397 let dist_client = DistClientContainer::new(config, &pool);
398 let storage = storage_from_config(config, &pool);
399 let res = SccacheServer::<ProcessCommandCreator>::new(
400 port,
401 pool,
402 runtime,
403 client,
404 dist_client,
405 storage,
406 );
407 let notify = env::var_os("SCCACHE_STARTUP_NOTIFY");
408 match res {
409 Ok(srv) => {
410 let port = srv.port();
411 info!("server started, listening on port {}", port);
412 notify_server_startup(¬ify, ServerStartup::Ok { port })?;
413 srv.run(future::empty::<(), ()>())?;
414 Ok(())
415 }
416 Err(e) => {
417 error!("failed to start server: {}", e);
418 match e.downcast_ref::<io::Error>() {
419 Some(io_err) if io::ErrorKind::AddrInUse == io_err.kind() => {
420 notify_server_startup(¬ify, ServerStartup::AddrInUse)?;
421 }
422 _ => {
423 let reason = e.to_string();
424 notify_server_startup(¬ify, ServerStartup::Err { reason })?;
425 }
426 };
427 Err(e)
428 }
429 }
430 }
431
432 pub struct SccacheServer<C: CommandCreatorSync> {
433 runtime: Runtime,
434 listener: TcpListener,
435 rx: mpsc::Receiver<ServerMessage>,
436 timeout: Duration,
437 service: SccacheService<C>,
438 wait: WaitUntilZero,
439 }
440
441 impl<C: CommandCreatorSync> SccacheServer<C> {
new( port: u16, pool: ThreadPool, runtime: Runtime, client: Client, dist_client: DistClientContainer, storage: Arc<dyn Storage>, ) -> Result<SccacheServer<C>>442 pub fn new(
443 port: u16,
444 pool: ThreadPool,
445 runtime: Runtime,
446 client: Client,
447 dist_client: DistClientContainer,
448 storage: Arc<dyn Storage>,
449 ) -> Result<SccacheServer<C>> {
450 let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), port);
451 let listener = TcpListener::bind(&SocketAddr::V4(addr))?;
452
453 // Prepare the service which we'll use to service all incoming TCP
454 // connections.
455 let (tx, rx) = mpsc::channel(1);
456 let (wait, info) = WaitUntilZero::new();
457 let service = SccacheService::new(dist_client, storage, &client, pool, tx, info);
458
459 Ok(SccacheServer {
460 runtime,
461 listener,
462 rx,
463 service,
464 timeout: Duration::from_secs(get_idle_timeout()),
465 wait,
466 })
467 }
468
469 /// Configures how long this server will be idle before shutting down.
470 #[allow(dead_code)]
set_idle_timeout(&mut self, timeout: Duration)471 pub fn set_idle_timeout(&mut self, timeout: Duration) {
472 self.timeout = timeout;
473 }
474
475 /// Set the storage this server will use.
476 #[allow(dead_code)]
set_storage(&mut self, storage: Arc<dyn Storage>)477 pub fn set_storage(&mut self, storage: Arc<dyn Storage>) {
478 self.service.storage = storage;
479 }
480
481 /// Returns a reference to a thread pool to run work on
482 #[allow(dead_code)]
pool(&self) -> &ThreadPool483 pub fn pool(&self) -> &ThreadPool {
484 &self.service.pool
485 }
486
487 /// Returns a reference to the command creator this server will use
488 #[allow(dead_code)]
command_creator(&self) -> &C489 pub fn command_creator(&self) -> &C {
490 &self.service.creator
491 }
492
493 /// Returns the port that this server is bound to
494 #[allow(dead_code)]
port(&self) -> u16495 pub fn port(&self) -> u16 {
496 self.listener.local_addr().unwrap().port()
497 }
498
499 /// Runs this server to completion.
500 ///
501 /// If the `shutdown` future resolves then the server will be shut down,
502 /// otherwise the server may naturally shut down if it becomes idle for too
503 /// long anyway.
run<F>(self, shutdown: F) -> io::Result<()> where F: Future,504 pub fn run<F>(self, shutdown: F) -> io::Result<()>
505 where
506 F: Future,
507 {
508 self._run(Box::new(shutdown.then(|_| Ok(()))))
509 }
510
_run<'a>(self, shutdown: Box<dyn Future<Item = (), Error = ()> + 'a>) -> io::Result<()>511 fn _run<'a>(self, shutdown: Box<dyn Future<Item = (), Error = ()> + 'a>) -> io::Result<()> {
512 let SccacheServer {
513 mut runtime,
514 listener,
515 rx,
516 service,
517 timeout,
518 wait,
519 } = self;
520
521 // Create our "server future" which will simply handle all incoming
522 // connections in separate tasks.
523 let server = listener.incoming().for_each(move |socket| {
524 trace!("incoming connection");
525 tokio_compat::runtime::current_thread::TaskExecutor::current()
526 .spawn_local(Box::new(service.clone().bind(socket).map_err(|err| {
527 error!("{}", err);
528 })))
529 .unwrap();
530 Ok(())
531 });
532
533 // Right now there's a whole bunch of ways to shut down this server for
534 // various purposes. These include:
535 //
536 // 1. The `shutdown` future above.
537 // 2. An RPC indicating the server should shut down
538 // 3. A period of inactivity (no requests serviced)
539 //
540 // These are all encapsulated wih the future that we're creating below.
541 // The `ShutdownOrInactive` indicates the RPC or the period of
542 // inactivity, and this is then select'd with the `shutdown` future
543 // passed to this function.
544
545 let shutdown = shutdown.map(|a| {
546 info!("shutting down due to explicit signal");
547 a
548 });
549
550 let mut futures = vec![
551 Box::new(server) as Box<dyn Future<Item = _, Error = _>>,
552 Box::new(
553 shutdown
554 .map_err(|()| io::Error::new(io::ErrorKind::Other, "shutdown signal failed")),
555 ),
556 ];
557
558 let shutdown_idle = ShutdownOrInactive {
559 rx,
560 timeout: if timeout != Duration::new(0, 0) {
561 Some(Delay::new(Instant::now() + timeout))
562 } else {
563 None
564 },
565 timeout_dur: timeout,
566 };
567 futures.push(Box::new(shutdown_idle.map(|a| {
568 info!("shutting down due to being idle or request");
569 a
570 })));
571
572 let server = future::select_all(futures);
573 runtime.block_on(server).map_err(|p| p.0)?;
574
575 info!(
576 "moving into the shutdown phase now, waiting at most 10 seconds \
577 for all client requests to complete"
578 );
579
580 // Once our server has shut down either due to inactivity or a manual
581 // request we still need to give a bit of time for all active
582 // connections to finish. This `wait` future will resolve once all
583 // instances of `SccacheService` have been dropped.
584 //
585 // Note that we cap the amount of time this can take, however, as we
586 // don't want to wait *too* long.
587 runtime
588 .block_on(Timeout::new(Compat::new(wait), Duration::new(30, 0)))
589 .map_err(|e| {
590 if e.is_inner() {
591 e.into_inner().unwrap()
592 } else {
593 io::Error::new(io::ErrorKind::Other, e)
594 }
595 })?;
596
597 info!("ok, fully shutting down now");
598
599 Ok(())
600 }
601 }
602
603 type CompilerMap<C> = HashMap<PathBuf, Option<CompilerCacheEntry<C>>>;
604
605 /// entry of the compiler cache
606 struct CompilerCacheEntry<C: CommandCreatorSync> {
607 /// compiler argument trait obj
608 pub compiler: Box<dyn Compiler<C>>,
609 /// modification time of the compilers executable file
610 pub mtime: FileTime,
611 /// distributed compilation extra info
612 pub dist_info: Option<(PathBuf, FileTime)>,
613 }
614
615 impl<C> CompilerCacheEntry<C>
616 where
617 C: CommandCreatorSync,
618 {
new( compiler: Box<dyn Compiler<C>>, mtime: FileTime, dist_info: Option<(PathBuf, FileTime)>, ) -> Self619 fn new(
620 compiler: Box<dyn Compiler<C>>,
621 mtime: FileTime,
622 dist_info: Option<(PathBuf, FileTime)>,
623 ) -> Self {
624 Self {
625 compiler,
626 mtime,
627 dist_info,
628 }
629 }
630 }
631 /// Service implementation for sccache
632 #[derive(Clone)]
633 struct SccacheService<C: CommandCreatorSync> {
634 /// Server statistics.
635 stats: Rc<RefCell<ServerStats>>,
636
637 /// Distributed sccache client
638 dist_client: Rc<DistClientContainer>,
639
640 /// Cache storage.
641 storage: Arc<dyn Storage>,
642
643 /// A cache of known compiler info.
644 compilers: Rc<RefCell<CompilerMap<C>>>,
645
646 /// map the cwd with compiler proxy path to a proxy resolver, which
647 /// will dynamically resolve the input compiler for the current context
648 /// (usually file or current working directory)
649 /// the associated `FileTime` is the modification time of
650 /// the compiler proxy, in order to track updates of the proxy itself
651 compiler_proxies: Rc<RefCell<HashMap<PathBuf, (Box<dyn CompilerProxy<C>>, FileTime)>>>,
652
653 /// Thread pool to execute work in
654 pool: ThreadPool,
655
656 /// An object for creating commands.
657 ///
658 /// This is mostly useful for unit testing, where we
659 /// can mock this out.
660 creator: C,
661
662 /// Message channel used to learn about requests received by this server.
663 ///
664 /// Note that messages sent along this channel will keep the server alive
665 /// (reset the idle timer) and this channel can also be used to shut down
666 /// the entire server immediately via a message.
667 tx: mpsc::Sender<ServerMessage>,
668
669 /// Information tracking how many services (connected clients) are active.
670 info: ActiveInfo,
671 }
672
673 type SccacheRequest = Message<Request, Body<()>>;
674 type SccacheResponse = Message<Response, Body<Response>>;
675
676 /// Messages sent from all services to the main event loop indicating activity.
677 ///
678 /// Whenever a request is receive a `Request` message is sent which will reset
679 /// the idle shutdown timer, and otherwise a `Shutdown` message indicates that
680 /// a server shutdown was requested via an RPC.
681 pub enum ServerMessage {
682 /// A message sent whenever a request is received.
683 Request,
684 /// Message sent whenever a shutdown request is received.
685 Shutdown,
686 }
687
688 impl<C> Service<SccacheRequest> for SccacheService<C>
689 where
690 C: CommandCreatorSync + 'static,
691 {
692 type Response = SccacheResponse;
693 type Error = Error;
694 type Future = SFuture<Self::Response>;
695
call(&mut self, req: SccacheRequest) -> Self::Future696 fn call(&mut self, req: SccacheRequest) -> Self::Future {
697 trace!("handle_client");
698
699 // Opportunistically let channel know that we've received a request. We
700 // ignore failures here as well as backpressure as it's not imperative
701 // that every message is received.
702 drop(self.tx.clone().start_send(ServerMessage::Request));
703
704 let res: SFuture<Response> = match req.into_inner() {
705 Request::Compile(compile) => {
706 debug!("handle_client: compile");
707 self.stats.borrow_mut().compile_requests += 1;
708 return self.handle_compile(compile);
709 }
710 Request::GetStats => {
711 debug!("handle_client: get_stats");
712 Box::new(self.get_info().map(|i| Response::Stats(Box::new(i))))
713 }
714 Request::DistStatus => {
715 debug!("handle_client: dist_status");
716 Box::new(self.get_dist_status().map(Response::DistStatus))
717 }
718 Request::ZeroStats => {
719 debug!("handle_client: zero_stats");
720 self.zero_stats();
721 Box::new(self.get_info().map(|i| Response::Stats(Box::new(i))))
722 }
723 Request::Shutdown => {
724 debug!("handle_client: shutdown");
725 let future = self
726 .tx
727 .clone()
728 .send(ServerMessage::Shutdown)
729 .then(|_| Ok(()));
730 let info_future = self.get_info();
731 return Box::new(future.join(info_future).map(move |(_, info)| {
732 Message::WithoutBody(Response::ShuttingDown(Box::new(info)))
733 }));
734 }
735 };
736
737 Box::new(res.map(Message::WithoutBody))
738 }
739
poll_ready(&mut self) -> Poll<(), Self::Error>740 fn poll_ready(&mut self) -> Poll<(), Self::Error> {
741 Ok(Async::Ready(()))
742 }
743 }
744
745 impl<C> SccacheService<C>
746 where
747 C: CommandCreatorSync,
748 {
new( dist_client: DistClientContainer, storage: Arc<dyn Storage>, client: &Client, pool: ThreadPool, tx: mpsc::Sender<ServerMessage>, info: ActiveInfo, ) -> SccacheService<C>749 pub fn new(
750 dist_client: DistClientContainer,
751 storage: Arc<dyn Storage>,
752 client: &Client,
753 pool: ThreadPool,
754 tx: mpsc::Sender<ServerMessage>,
755 info: ActiveInfo,
756 ) -> SccacheService<C> {
757 SccacheService {
758 stats: Rc::new(RefCell::new(ServerStats::default())),
759 dist_client: Rc::new(dist_client),
760 storage,
761 compilers: Rc::new(RefCell::new(HashMap::new())),
762 compiler_proxies: Rc::new(RefCell::new(HashMap::new())),
763 pool,
764 creator: C::new(client),
765 tx,
766 info,
767 }
768 }
769
bind<T>(mut self, socket: T) -> impl Future<Item = (), Error = Error> where T: AsyncRead + AsyncWrite + 'static,770 fn bind<T>(mut self, socket: T) -> impl Future<Item = (), Error = Error>
771 where
772 T: AsyncRead + AsyncWrite + 'static,
773 {
774 let mut builder = length_delimited::Builder::new();
775 if let Ok(max_frame_length_str) = env::var("SCCACHE_MAX_FRAME_LENGTH") {
776 if let Ok(max_frame_length) = max_frame_length_str.parse::<usize>() {
777 builder.max_frame_length(max_frame_length);
778 } else {
779 warn!("Content of SCCACHE_MAX_FRAME_LENGTH is not a valid number, using default");
780 }
781 }
782 let io = builder.new_framed(socket);
783
784 let (sink, stream) = SccacheTransport {
785 inner: WriteBincode::new(ReadBincode::new(io)),
786 }
787 .split();
788 let sink = sink.sink_from_err::<Error>();
789
790 stream
791 .from_err::<Error>()
792 .and_then(move |input| self.call(input))
793 .and_then(|message| {
794 let f: Box<dyn Stream<Item = _, Error = _>> = match message {
795 Message::WithoutBody(message) => Box::new(stream::once(Ok(Frame::Message {
796 message,
797 body: false,
798 }))),
799 Message::WithBody(message, body) => Box::new(
800 stream::once(Ok(Frame::Message {
801 message,
802 body: true,
803 }))
804 .chain(Compat::new(body).map(|chunk| Frame::Body { chunk: Some(chunk) }))
805 .chain(stream::once(Ok(Frame::Body { chunk: None }))),
806 ),
807 };
808 Ok(f.from_err::<Error>())
809 })
810 .flatten()
811 .forward(sink)
812 .map(|_| ())
813 }
814
815 /// Get dist status.
get_dist_status(&self) -> SFuture<DistInfo>816 fn get_dist_status(&self) -> SFuture<DistInfo> {
817 f_ok(self.dist_client.get_status())
818 }
819
820 /// Get info and stats about the cache.
get_info(&self) -> SFuture<ServerInfo>821 fn get_info(&self) -> SFuture<ServerInfo> {
822 let stats = self.stats.borrow().clone();
823 let cache_location = self.storage.location();
824 Box::new(
825 self.storage
826 .current_size()
827 .join(self.storage.max_size())
828 .map(move |(cache_size, max_cache_size)| ServerInfo {
829 stats,
830 cache_location,
831 cache_size,
832 max_cache_size,
833 }),
834 )
835 }
836
837 /// Zero stats about the cache.
zero_stats(&self)838 fn zero_stats(&self) {
839 *self.stats.borrow_mut() = ServerStats::default();
840 }
841
842 /// Handle a compile request from a client.
843 ///
844 /// This will handle a compile request entirely, generating a response with
845 /// the inital information and an optional body which will eventually
846 /// contain the results of the compilation.
handle_compile(&self, compile: Compile) -> SFuture<SccacheResponse>847 fn handle_compile(&self, compile: Compile) -> SFuture<SccacheResponse> {
848 let exe = compile.exe;
849 let cmd = compile.args;
850 let cwd: PathBuf = compile.cwd.into();
851 let env_vars = compile.env_vars;
852 let me = self.clone();
853
854 Box::new(
855 self.compiler_info(exe.into(), cwd.clone(), &env_vars)
856 .map(move |info| me.check_compiler(info, cmd, cwd, env_vars)),
857 )
858 }
859
860 /// Look up compiler info from the cache for the compiler `path`.
861 /// If not cached, determine the compiler type and cache the result.
compiler_info( &self, path: PathBuf, cwd: PathBuf, env: &[(OsString, OsString)], ) -> SFuture<Result<Box<dyn Compiler<C>>>>862 fn compiler_info(
863 &self,
864 path: PathBuf,
865 cwd: PathBuf,
866 env: &[(OsString, OsString)],
867 ) -> SFuture<Result<Box<dyn Compiler<C>>>> {
868 trace!("compiler_info");
869
870 let me = self.clone();
871 let me1 = self.clone();
872
873 // lookup if compiler proxy exists for the current compiler path
874
875 let path2 = path.clone();
876 let path1 = path.clone();
877 let env = env.to_vec();
878
879 let resolve_w_proxy = {
880 let compiler_proxies_borrow = self.compiler_proxies.borrow();
881
882 if let Some((compiler_proxy, _filetime)) = compiler_proxies_borrow.get(&path) {
883 let fut = compiler_proxy.resolve_proxied_executable(
884 self.creator.clone(),
885 cwd.clone(),
886 env.as_slice(),
887 );
888 Box::new(fut.then(|res: Result<_>| Ok(res.ok())))
889 } else {
890 f_ok(None)
891 }
892 };
893
894 // use the supplied compiler path as fallback, lookup its modification time too
895 let w_fallback = resolve_w_proxy.then(move |res: Result<Option<(PathBuf, FileTime)>>| {
896 let opt = match res {
897 Ok(Some(x)) => Some(x), // TODO resolve the path right away
898 _ => {
899 // fallback to using the path directly
900 metadata(&path2)
901 .map(|attr| FileTime::from_last_modification_time(&attr))
902 .ok()
903 .map(move |filetime| (path2, filetime))
904 }
905 };
906 f_ok(opt)
907 });
908
909 let lookup_compiler = w_fallback.and_then(move |opt: Option<(PathBuf, FileTime)>| {
910 let (resolved_compiler_path, mtime) =
911 opt.expect("Must contain sane data, otherwise mtime is not avail");
912
913 let dist_info = match me1.dist_client.get_client() {
914 Ok(Some(ref client)) => {
915 if let Some(archive) = client.get_custom_toolchain(&resolved_compiler_path) {
916 match metadata(&archive)
917 .map(|attr| FileTime::from_last_modification_time(&attr))
918 {
919 Ok(mtime) => Some((archive, mtime)),
920 _ => None,
921 }
922 } else {
923 None
924 }
925 }
926 _ => None,
927 };
928
929 let opt = match me1.compilers.borrow().get(&resolved_compiler_path) {
930 // It's a hit only if the mtime and dist archive data matches.
931 Some(&Some(ref entry)) => {
932 if entry.mtime == mtime && entry.dist_info == dist_info {
933 Some(entry.compiler.clone())
934 } else {
935 None
936 }
937 }
938 _ => None,
939 };
940 f_ok((resolved_compiler_path, mtime, opt, dist_info))
941 });
942
943 let obtain = lookup_compiler.and_then(
944 move |(resolved_compiler_path, mtime, opt, dist_info): (
945 PathBuf,
946 FileTime,
947 Option<Box<dyn Compiler<C>>>,
948 Option<(PathBuf, FileTime)>,
949 )| {
950 match opt {
951 Some(info) => {
952 trace!("compiler_info cache hit");
953 f_ok(Ok(info))
954 }
955 None => {
956 trace!("compiler_info cache miss");
957 // Check the compiler type and return the result when
958 // finished. This generally involves invoking the compiler,
959 // so do it asynchronously.
960
961 // the compiler path might be compiler proxy, so it is important to use
962 // `path` (or its clone `path1`) to resolve using that one, not using `resolved_compiler_path`
963 let x = get_compiler_info::<C>(
964 me.creator.clone(),
965 &path1,
966 &cwd,
967 env.as_slice(),
968 &me.pool,
969 dist_info.clone().map(|(p, _)| p),
970 );
971
972 Box::new(x.then(
973 move |info: Result<(
974 Box<dyn Compiler<C>>,
975 Option<Box<dyn CompilerProxy<C>>>,
976 )>| {
977 match info {
978 Ok((ref c, ref proxy)) => {
979 // register the proxy for this compiler, so it will be used directly from now on
980 // and the true/resolved compiler will create table hits in the hash map
981 // based on the resolved path
982 if let Some(proxy) = proxy {
983 trace!(
984 "Inserting new path proxy {:?} @ {:?} -> {:?}",
985 &path,
986 &cwd,
987 resolved_compiler_path
988 );
989 let proxy: Box<dyn CompilerProxy<C>> =
990 proxy.box_clone();
991 me.compiler_proxies
992 .borrow_mut()
993 .insert(path, (proxy, mtime));
994 }
995 // TODO add some safety checks in case a proxy exists, that the initial `path` is not
996 // TODO the same as the resolved compiler binary
997
998 // cache
999 let map_info =
1000 CompilerCacheEntry::new(c.clone(), mtime, dist_info);
1001 trace!(
1002 "Inserting POSSIBLY PROXIED cache map info for {:?}",
1003 &resolved_compiler_path
1004 );
1005 me.compilers
1006 .borrow_mut()
1007 .insert(resolved_compiler_path, Some(map_info));
1008 }
1009 Err(_) => {
1010 trace!("Inserting PLAIN cache map info for {:?}", &path);
1011 me.compilers.borrow_mut().insert(path, None);
1012 }
1013 }
1014 // drop the proxy information, response is compiler only
1015 let r: Result<Box<dyn Compiler<C>>> = info.map(|info| info.0);
1016 f_ok(r)
1017 },
1018 ))
1019 }
1020 }
1021 },
1022 );
1023
1024 Box::new(obtain)
1025 }
1026
1027 /// Check that we can handle and cache `cmd` when run with `compiler`.
1028 /// If so, run `start_compile_task` to execute it.
check_compiler( &self, compiler: Result<Box<dyn Compiler<C>>>, cmd: Vec<OsString>, cwd: PathBuf, env_vars: Vec<(OsString, OsString)>, ) -> SccacheResponse1029 fn check_compiler(
1030 &self,
1031 compiler: Result<Box<dyn Compiler<C>>>,
1032 cmd: Vec<OsString>,
1033 cwd: PathBuf,
1034 env_vars: Vec<(OsString, OsString)>,
1035 ) -> SccacheResponse {
1036 let mut stats = self.stats.borrow_mut();
1037 match compiler {
1038 Err(e) => {
1039 debug!("check_compiler: Unsupported compiler: {}", e.to_string());
1040 stats.requests_unsupported_compiler += 1;
1041 return Message::WithoutBody(Response::Compile(
1042 CompileResponse::UnsupportedCompiler(OsString::from(e.to_string())),
1043 ));
1044 }
1045 Ok(c) => {
1046 debug!("check_compiler: Supported compiler");
1047 // Now check that we can handle this compiler with
1048 // the provided commandline.
1049 match c.parse_arguments(&cmd, &cwd) {
1050 CompilerArguments::Ok(hasher) => {
1051 debug!("parse_arguments: Ok: {:?}", cmd);
1052 stats.requests_executed += 1;
1053 let (tx, rx) = Body::pair();
1054 self.start_compile_task(c, hasher, cmd, cwd, env_vars, tx);
1055 let res = CompileResponse::CompileStarted;
1056 return Message::WithBody(Response::Compile(res), rx);
1057 }
1058 CompilerArguments::CannotCache(why, extra_info) => {
1059 if let Some(extra_info) = extra_info {
1060 debug!(
1061 "parse_arguments: CannotCache({}, {}): {:?}",
1062 why, extra_info, cmd
1063 )
1064 } else {
1065 debug!("parse_arguments: CannotCache({}): {:?}", why, cmd)
1066 }
1067 stats.requests_not_cacheable += 1;
1068 *stats.not_cached.entry(why.to_string()).or_insert(0) += 1;
1069 }
1070 CompilerArguments::NotCompilation => {
1071 debug!("parse_arguments: NotCompilation: {:?}", cmd);
1072 stats.requests_not_compile += 1;
1073 }
1074 }
1075 }
1076 }
1077
1078 let res = CompileResponse::UnhandledCompile;
1079 Message::WithoutBody(Response::Compile(res))
1080 }
1081
1082 /// Given compiler arguments `arguments`, look up
1083 /// a compile result in the cache or execute the compilation and store
1084 /// the result in the cache.
start_compile_task( &self, compiler: Box<dyn Compiler<C>>, hasher: Box<dyn CompilerHasher<C>>, arguments: Vec<OsString>, cwd: PathBuf, env_vars: Vec<(OsString, OsString)>, tx: mpsc::Sender<Result<Response>>, )1085 fn start_compile_task(
1086 &self,
1087 compiler: Box<dyn Compiler<C>>,
1088 hasher: Box<dyn CompilerHasher<C>>,
1089 arguments: Vec<OsString>,
1090 cwd: PathBuf,
1091 env_vars: Vec<(OsString, OsString)>,
1092 tx: mpsc::Sender<Result<Response>>,
1093 ) {
1094 let force_recache = env_vars
1095 .iter()
1096 .any(|&(ref k, ref _v)| k.as_os_str() == OsStr::new("SCCACHE_RECACHE"));
1097 let cache_control = if force_recache {
1098 CacheControl::ForceRecache
1099 } else {
1100 CacheControl::Default
1101 };
1102 let out_pretty = hasher.output_pretty().into_owned();
1103 let color_mode = hasher.color_mode();
1104 let result = hasher.get_cached_or_compile(
1105 self.dist_client.get_client(),
1106 self.creator.clone(),
1107 self.storage.clone(),
1108 arguments,
1109 cwd,
1110 env_vars,
1111 cache_control,
1112 self.pool.clone(),
1113 );
1114 let me = self.clone();
1115 let kind = compiler.kind();
1116 let task = result.then(move |result| {
1117 let mut cache_write = None;
1118 let mut stats = me.stats.borrow_mut();
1119 let mut res = CompileFinished {
1120 color_mode,
1121 ..Default::default()
1122 };
1123 match result {
1124 Ok((compiled, out)) => {
1125 match compiled {
1126 CompileResult::Error => {
1127 stats.cache_errors.increment(&kind);
1128 }
1129 CompileResult::CacheHit(duration) => {
1130 stats.cache_hits.increment(&kind);
1131 stats.cache_read_hit_duration += duration;
1132 }
1133 CompileResult::CacheMiss(miss_type, dist_type, duration, future) => {
1134 match dist_type {
1135 DistType::NoDist => {}
1136 DistType::Ok(id) => {
1137 let server = id.addr().to_string();
1138 let server_count =
1139 stats.dist_compiles.entry(server).or_insert(0);
1140 *server_count += 1;
1141 }
1142 DistType::Error => stats.dist_errors += 1,
1143 }
1144 match miss_type {
1145 MissType::Normal => {}
1146 MissType::ForcedRecache => {
1147 stats.forced_recaches += 1;
1148 }
1149 MissType::TimedOut => {
1150 stats.cache_timeouts += 1;
1151 }
1152 MissType::CacheReadError => {
1153 stats.cache_errors.increment(&kind);
1154 }
1155 }
1156 stats.cache_misses.increment(&kind);
1157 stats.cache_read_miss_duration += duration;
1158 cache_write = Some(future);
1159 }
1160 CompileResult::NotCacheable => {
1161 stats.cache_misses.increment(&kind);
1162 stats.non_cacheable_compilations += 1;
1163 }
1164 CompileResult::CompileFailed => {
1165 stats.compile_fails += 1;
1166 }
1167 };
1168 let Output {
1169 status,
1170 stdout,
1171 stderr,
1172 } = out;
1173 trace!("CompileFinished retcode: {}", status);
1174 match status.code() {
1175 Some(code) => res.retcode = Some(code),
1176 None => res.signal = Some(get_signal(status)),
1177 };
1178 res.stdout = stdout;
1179 res.stderr = stderr;
1180 }
1181 Err(err) => {
1182 match err.downcast::<ProcessError>() {
1183 Ok(ProcessError(output)) => {
1184 debug!("Compilation failed: {:?}", output);
1185 stats.compile_fails += 1;
1186 match output.status.code() {
1187 Some(code) => res.retcode = Some(code),
1188 None => res.signal = Some(get_signal(output.status)),
1189 };
1190 res.stdout = output.stdout;
1191 res.stderr = output.stderr;
1192 }
1193 Err(err) => match err.downcast::<HttpClientError>() {
1194 Ok(HttpClientError(msg)) => {
1195 me.dist_client.reset_state();
1196 let errmsg =
1197 format!("[{:?}] http error status: {}", out_pretty, msg);
1198 error!("{}", errmsg);
1199 res.retcode = Some(1);
1200 res.stderr = errmsg.as_bytes().to_vec();
1201 }
1202 Err(err) => {
1203 use std::fmt::Write;
1204
1205 error!("[{:?}] fatal error: {}", out_pretty, err);
1206
1207 let mut error = "sccache: encountered fatal error\n".to_string();
1208 let _ = writeln!(error, "sccache: error: {}", err);
1209 for e in err.chain() {
1210 error!("[{:?}] \t{}", out_pretty, e);
1211 let _ = writeln!(error, "sccache: caused by: {}", e);
1212 }
1213 stats.cache_errors.increment(&kind);
1214 //TODO: figure out a better way to communicate this?
1215 res.retcode = Some(-2);
1216 res.stderr = error.into_bytes();
1217 }
1218 },
1219 }
1220 }
1221 };
1222 let send = tx.send(Ok(Response::CompileFinished(res)));
1223
1224 let me = me.clone();
1225 let cache_write = cache_write.then(move |result| {
1226 match result {
1227 Err(e) => {
1228 debug!("Error executing cache write: {}", e);
1229 me.stats.borrow_mut().cache_write_errors += 1;
1230 }
1231 //TODO: save cache stats!
1232 Ok(Some(info)) => {
1233 debug!(
1234 "[{}]: Cache write finished in {}",
1235 info.object_file_pretty,
1236 util::fmt_duration_as_secs(&info.duration)
1237 );
1238 me.stats.borrow_mut().cache_writes += 1;
1239 me.stats.borrow_mut().cache_write_duration += info.duration;
1240 }
1241
1242 Ok(None) => {}
1243 }
1244 Ok(())
1245 });
1246
1247 send.join(cache_write).then(|_| Ok(()))
1248 });
1249
1250 tokio_compat::runtime::current_thread::TaskExecutor::current()
1251 .spawn_local(Box::new(task))
1252 .unwrap();
1253 }
1254 }
1255
1256 #[derive(Serialize, Deserialize, Debug, Clone, Default)]
1257 pub struct PerLanguageCount {
1258 counts: HashMap<String, u64>,
1259 }
1260
1261 impl PerLanguageCount {
increment(&mut self, kind: &CompilerKind)1262 fn increment(&mut self, kind: &CompilerKind) {
1263 let key = kind.lang_kind();
1264 let count = self.counts.entry(key).or_insert(0);
1265 *count += 1;
1266 }
1267
all(&self) -> u641268 pub fn all(&self) -> u64 {
1269 self.counts.values().sum()
1270 }
1271
get(&self, key: &str) -> Option<&u64>1272 pub fn get(&self, key: &str) -> Option<&u64> {
1273 self.counts.get(key)
1274 }
1275
new() -> PerLanguageCount1276 pub fn new() -> PerLanguageCount {
1277 Self::default()
1278 }
1279 }
1280
1281 /// Statistics about the server.
1282 #[derive(Serialize, Deserialize, Clone, Debug)]
1283 pub struct ServerStats {
1284 /// The count of client compile requests.
1285 pub compile_requests: u64,
1286 /// The count of client requests that used an unsupported compiler.
1287 pub requests_unsupported_compiler: u64,
1288 /// The count of client requests that were not compilation.
1289 pub requests_not_compile: u64,
1290 /// The count of client requests that were not cacheable.
1291 pub requests_not_cacheable: u64,
1292 /// The count of client requests that were executed.
1293 pub requests_executed: u64,
1294 /// The count of errors handling compile requests (per language).
1295 pub cache_errors: PerLanguageCount,
1296 /// The count of cache hits for handled compile requests (per language).
1297 pub cache_hits: PerLanguageCount,
1298 /// The count of cache misses for handled compile requests (per language).
1299 pub cache_misses: PerLanguageCount,
1300 /// The count of cache misses because the cache took too long to respond.
1301 pub cache_timeouts: u64,
1302 /// The count of errors reading cache entries.
1303 pub cache_read_errors: u64,
1304 /// The count of compilations which were successful but couldn't be cached.
1305 pub non_cacheable_compilations: u64,
1306 /// The count of compilations which forcibly ignored the cache.
1307 pub forced_recaches: u64,
1308 /// The count of errors writing to cache.
1309 pub cache_write_errors: u64,
1310 /// The number of successful cache writes.
1311 pub cache_writes: u64,
1312 /// The total time spent writing cache entries.
1313 pub cache_write_duration: Duration,
1314 /// The total time spent reading cache hits.
1315 pub cache_read_hit_duration: Duration,
1316 /// The total time spent reading cache misses.
1317 pub cache_read_miss_duration: Duration,
1318 /// The count of compilation failures.
1319 pub compile_fails: u64,
1320 /// Counts of reasons why compiles were not cached.
1321 pub not_cached: HashMap<String, usize>,
1322 /// The count of compilations that were successfully distributed indexed
1323 /// by the server that ran those compilations.
1324 pub dist_compiles: HashMap<String, usize>,
1325 /// The count of compilations that were distributed but failed and had to be re-run locally
1326 pub dist_errors: u64,
1327 }
1328
1329 /// Info and stats about the server.
1330 #[derive(Serialize, Deserialize, Clone, Debug)]
1331 pub struct ServerInfo {
1332 pub stats: ServerStats,
1333 pub cache_location: String,
1334 pub cache_size: Option<u64>,
1335 pub max_cache_size: Option<u64>,
1336 }
1337
1338 /// Status of the dist client.
1339 #[derive(Serialize, Deserialize, Clone, Debug)]
1340 pub enum DistInfo {
1341 Disabled(String),
1342 #[cfg(feature = "dist-client")]
1343 NotConnected(Option<config::HTTPUrl>, String),
1344 #[cfg(feature = "dist-client")]
1345 SchedulerStatus(Option<config::HTTPUrl>, dist::SchedulerStatusResult),
1346 }
1347
1348 impl Default for ServerStats {
default() -> ServerStats1349 fn default() -> ServerStats {
1350 ServerStats {
1351 compile_requests: u64::default(),
1352 requests_unsupported_compiler: u64::default(),
1353 requests_not_compile: u64::default(),
1354 requests_not_cacheable: u64::default(),
1355 requests_executed: u64::default(),
1356 cache_errors: PerLanguageCount::new(),
1357 cache_hits: PerLanguageCount::new(),
1358 cache_misses: PerLanguageCount::new(),
1359 cache_timeouts: u64::default(),
1360 cache_read_errors: u64::default(),
1361 non_cacheable_compilations: u64::default(),
1362 forced_recaches: u64::default(),
1363 cache_write_errors: u64::default(),
1364 cache_writes: u64::default(),
1365 cache_write_duration: Duration::new(0, 0),
1366 cache_read_hit_duration: Duration::new(0, 0),
1367 cache_read_miss_duration: Duration::new(0, 0),
1368 compile_fails: u64::default(),
1369 not_cached: HashMap::new(),
1370 dist_compiles: HashMap::new(),
1371 dist_errors: u64::default(),
1372 }
1373 }
1374 }
1375
1376 impl ServerStats {
1377 /// Print stats to stdout in a human-readable format.
1378 ///
1379 /// Return the formatted width of each of the (name, value) columns.
print(&self) -> (usize, usize)1380 fn print(&self) -> (usize, usize) {
1381 macro_rules! set_stat {
1382 ($vec:ident, $var:expr, $name:expr) => {{
1383 // name, value, suffix length
1384 $vec.push(($name.to_string(), $var.to_string(), 0));
1385 }};
1386 }
1387
1388 macro_rules! set_lang_stat {
1389 ($vec:ident, $var:expr, $name:expr) => {{
1390 $vec.push(($name.to_string(), $var.all().to_string(), 0));
1391 let mut sorted_stats: Vec<_> = $var.counts.iter().collect();
1392 sorted_stats.sort_by_key(|v| v.0);
1393 for (lang, count) in sorted_stats.iter() {
1394 $vec.push((format!("{} ({})", $name, lang), count.to_string(), 0));
1395 }
1396 }};
1397 }
1398
1399 macro_rules! set_duration_stat {
1400 ($vec:ident, $dur:expr, $num:expr, $name:expr) => {{
1401 let s = if $num > 0 {
1402 $dur / $num as u32
1403 } else {
1404 Default::default()
1405 };
1406 // name, value, suffix length
1407 $vec.push(($name.to_string(), util::fmt_duration_as_secs(&s), 2));
1408 }};
1409 }
1410
1411 let mut stats_vec = vec![];
1412 //TODO: this would be nice to replace with a custom derive implementation.
1413 set_stat!(stats_vec, self.compile_requests, "Compile requests");
1414 set_stat!(
1415 stats_vec,
1416 self.requests_executed,
1417 "Compile requests executed"
1418 );
1419 set_lang_stat!(stats_vec, self.cache_hits, "Cache hits");
1420 set_lang_stat!(stats_vec, self.cache_misses, "Cache misses");
1421 set_stat!(stats_vec, self.cache_timeouts, "Cache timeouts");
1422 set_stat!(stats_vec, self.cache_read_errors, "Cache read errors");
1423 set_stat!(stats_vec, self.forced_recaches, "Forced recaches");
1424 set_stat!(stats_vec, self.cache_write_errors, "Cache write errors");
1425 set_stat!(stats_vec, self.compile_fails, "Compilation failures");
1426 set_lang_stat!(stats_vec, self.cache_errors, "Cache errors");
1427 set_stat!(
1428 stats_vec,
1429 self.non_cacheable_compilations,
1430 "Non-cacheable compilations"
1431 );
1432 set_stat!(
1433 stats_vec,
1434 self.requests_not_cacheable,
1435 "Non-cacheable calls"
1436 );
1437 set_stat!(
1438 stats_vec,
1439 self.requests_not_compile,
1440 "Non-compilation calls"
1441 );
1442 set_stat!(
1443 stats_vec,
1444 self.requests_unsupported_compiler,
1445 "Unsupported compiler calls"
1446 );
1447 set_duration_stat!(
1448 stats_vec,
1449 self.cache_write_duration,
1450 self.cache_writes,
1451 "Average cache write"
1452 );
1453 set_duration_stat!(
1454 stats_vec,
1455 self.cache_read_miss_duration,
1456 self.cache_misses.all(),
1457 "Average cache read miss"
1458 );
1459 set_duration_stat!(
1460 stats_vec,
1461 self.cache_read_hit_duration,
1462 self.cache_hits.all(),
1463 "Average cache read hit"
1464 );
1465 set_stat!(
1466 stats_vec,
1467 self.dist_errors,
1468 "Failed distributed compilations"
1469 );
1470 let name_width = stats_vec
1471 .iter()
1472 .map(|&(ref n, _, _)| n.len())
1473 .max()
1474 .unwrap();
1475 let stat_width = stats_vec
1476 .iter()
1477 .map(|&(_, ref s, _)| s.len())
1478 .max()
1479 .unwrap();
1480 for (name, stat, suffix_len) in stats_vec {
1481 println!(
1482 "{:<name_width$} {:>stat_width$}",
1483 name,
1484 stat,
1485 name_width = name_width,
1486 stat_width = stat_width + suffix_len
1487 );
1488 }
1489 if !self.dist_compiles.is_empty() {
1490 println!("\nSuccessful distributed compiles");
1491 let mut counts: Vec<_> = self.dist_compiles.iter().collect();
1492 counts.sort_by(|(_, c1), (_, c2)| c1.cmp(c2).reverse());
1493 for (reason, count) in counts {
1494 println!(
1495 " {:<name_width$} {:>stat_width$}",
1496 reason,
1497 count,
1498 name_width = name_width - 2,
1499 stat_width = stat_width
1500 );
1501 }
1502 }
1503 if !self.not_cached.is_empty() {
1504 println!("\nNon-cacheable reasons:");
1505 let mut counts: Vec<_> = self.not_cached.iter().collect();
1506 counts.sort_by(|(_, c1), (_, c2)| c1.cmp(c2).reverse());
1507 for (reason, count) in counts {
1508 println!(
1509 "{:<name_width$} {:>stat_width$}",
1510 reason,
1511 count,
1512 name_width = name_width,
1513 stat_width = stat_width
1514 );
1515 }
1516 println!();
1517 }
1518 (name_width, stat_width)
1519 }
1520 }
1521
1522 impl ServerInfo {
1523 /// Print info to stdout in a human-readable format.
print(&self)1524 pub fn print(&self) {
1525 let (name_width, stat_width) = self.stats.print();
1526 println!(
1527 "{:<name_width$} {}",
1528 "Cache location",
1529 self.cache_location,
1530 name_width = name_width
1531 );
1532 for &(name, val) in &[
1533 ("Cache size", &self.cache_size),
1534 ("Max cache size", &self.max_cache_size),
1535 ] {
1536 if let Some(val) = *val {
1537 let (val, suffix) = match NumberPrefix::binary(val as f64) {
1538 NumberPrefix::Standalone(bytes) => (bytes.to_string(), "bytes".to_string()),
1539 NumberPrefix::Prefixed(prefix, n) => {
1540 (format!("{:.0}", n), format!("{}B", prefix))
1541 }
1542 };
1543 println!(
1544 "{:<name_width$} {:>stat_width$} {}",
1545 name,
1546 val,
1547 suffix,
1548 name_width = name_width,
1549 stat_width = stat_width
1550 );
1551 }
1552 }
1553 }
1554 }
1555
1556 enum Frame<R, R1> {
1557 Body { chunk: Option<R1> },
1558 Message { message: R, body: bool },
1559 }
1560
1561 struct Body<R> {
1562 receiver: mpsc::Receiver<Result<R>>,
1563 }
1564
1565 impl<R> Body<R> {
pair() -> (mpsc::Sender<Result<R>>, Self)1566 fn pair() -> (mpsc::Sender<Result<R>>, Self) {
1567 let (tx, rx) = mpsc::channel(0);
1568 (tx, Body { receiver: rx })
1569 }
1570 }
1571
1572 impl<R> futures_03::Stream for Body<R> {
1573 type Item = Result<R>;
poll_next( mut self: Pin<&mut Self>, _cx: &mut Context<'_>, ) -> std::task::Poll<Option<Self::Item>>1574 fn poll_next(
1575 mut self: Pin<&mut Self>,
1576 _cx: &mut Context<'_>,
1577 ) -> std::task::Poll<Option<Self::Item>> {
1578 match Pin::new(&mut self.receiver).poll().unwrap() {
1579 Async::Ready(item) => std::task::Poll::Ready(item),
1580 Async::NotReady => std::task::Poll::Pending,
1581 }
1582 }
1583 }
1584
1585 enum Message<R, B> {
1586 WithBody(R, B),
1587 WithoutBody(R),
1588 }
1589
1590 impl<R, B> Message<R, B> {
into_inner(self) -> R1591 fn into_inner(self) -> R {
1592 match self {
1593 Message::WithBody(r, _) => r,
1594 Message::WithoutBody(r) => r,
1595 }
1596 }
1597 }
1598
1599 /// Implementation of `Stream + Sink` that tokio-proto is expecting
1600 ///
1601 /// This type is composed of a few layers:
1602 ///
1603 /// * First there's `I`, the I/O object implementing `AsyncRead` and
1604 /// `AsyncWrite`
1605 /// * Next that's framed using the `length_delimited` module in tokio-io giving
1606 /// us a `Sink` and `Stream` of `BytesMut`.
1607 /// * Next that sink/stream is wrapped in `ReadBincode` which will cause the
1608 /// `Stream` implementation to switch from `BytesMut` to `Request` by parsing
1609 /// the bytes bincode.
1610 /// * Finally that sink/stream is wrapped in `WriteBincode` which will cause the
1611 /// `Sink` implementation to switch from `BytesMut` to `Response` meaning that
1612 /// all `Response` types pushed in will be converted to `BytesMut` and pushed
1613 /// below.
1614 struct SccacheTransport<I: AsyncRead + AsyncWrite> {
1615 inner: WriteBincode<ReadBincode<Framed<I>, Request>, Response>,
1616 }
1617
1618 impl<I: AsyncRead + AsyncWrite> Stream for SccacheTransport<I> {
1619 type Item = Message<Request, Body<()>>;
1620 type Error = io::Error;
1621
poll(&mut self) -> Poll<Option<Self::Item>, io::Error>1622 fn poll(&mut self) -> Poll<Option<Self::Item>, io::Error> {
1623 let msg = try_ready!(self.inner.poll().map_err(|e| {
1624 error!("SccacheTransport::poll failed: {}", e);
1625 io::Error::new(io::ErrorKind::Other, e)
1626 }));
1627 Ok(msg.map(Message::WithoutBody).into())
1628 }
1629 }
1630
1631 impl<I: AsyncRead + AsyncWrite> Sink for SccacheTransport<I> {
1632 type SinkItem = Frame<Response, Response>;
1633 type SinkError = io::Error;
1634
start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, io::Error>1635 fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, io::Error> {
1636 match item {
1637 Frame::Message { message, body } => match self.inner.start_send(message)? {
1638 AsyncSink::Ready => Ok(AsyncSink::Ready),
1639 AsyncSink::NotReady(message) => {
1640 Ok(AsyncSink::NotReady(Frame::Message { message, body }))
1641 }
1642 },
1643 Frame::Body { chunk: Some(chunk) } => match self.inner.start_send(chunk)? {
1644 AsyncSink::Ready => Ok(AsyncSink::Ready),
1645 AsyncSink::NotReady(chunk) => {
1646 Ok(AsyncSink::NotReady(Frame::Body { chunk: Some(chunk) }))
1647 }
1648 },
1649 Frame::Body { chunk: None } => Ok(AsyncSink::Ready),
1650 }
1651 }
1652
poll_complete(&mut self) -> Poll<(), io::Error>1653 fn poll_complete(&mut self) -> Poll<(), io::Error> {
1654 self.inner.poll_complete()
1655 }
1656
close(&mut self) -> Poll<(), io::Error>1657 fn close(&mut self) -> Poll<(), io::Error> {
1658 self.inner.close()
1659 }
1660 }
1661
1662 struct ShutdownOrInactive {
1663 rx: mpsc::Receiver<ServerMessage>,
1664 timeout: Option<Delay>,
1665 timeout_dur: Duration,
1666 }
1667
1668 impl Future for ShutdownOrInactive {
1669 type Item = ();
1670 type Error = io::Error;
1671
poll(&mut self) -> Poll<(), io::Error>1672 fn poll(&mut self) -> Poll<(), io::Error> {
1673 loop {
1674 match self.rx.poll().unwrap() {
1675 Async::NotReady => break,
1676 // Shutdown received!
1677 Async::Ready(Some(ServerMessage::Shutdown)) => return Ok(().into()),
1678 Async::Ready(Some(ServerMessage::Request)) => {
1679 if self.timeout_dur != Duration::new(0, 0) {
1680 self.timeout = Some(Delay::new(Instant::now() + self.timeout_dur));
1681 }
1682 }
1683 // All services have shut down, in theory this isn't possible...
1684 Async::Ready(None) => return Ok(().into()),
1685 }
1686 }
1687 match self.timeout {
1688 None => Ok(Async::NotReady),
1689 Some(ref mut timeout) => timeout
1690 .poll()
1691 .map_err(|err| io::Error::new(io::ErrorKind::Other, err)),
1692 }
1693 }
1694 }
1695
1696 /// Helper future which tracks the `ActiveInfo` below. This future will resolve
1697 /// once all instances of `ActiveInfo` have been dropped.
1698 struct WaitUntilZero {
1699 info: Rc<RefCell<Info>>,
1700 }
1701
1702 struct ActiveInfo {
1703 info: Rc<RefCell<Info>>,
1704 }
1705
1706 struct Info {
1707 active: usize,
1708 waker: Option<Waker>,
1709 }
1710
1711 impl WaitUntilZero {
new() -> (WaitUntilZero, ActiveInfo)1712 fn new() -> (WaitUntilZero, ActiveInfo) {
1713 let info = Rc::new(RefCell::new(Info {
1714 active: 1,
1715 waker: None,
1716 }));
1717
1718 (WaitUntilZero { info: info.clone() }, ActiveInfo { info })
1719 }
1720 }
1721
1722 impl Clone for ActiveInfo {
clone(&self) -> ActiveInfo1723 fn clone(&self) -> ActiveInfo {
1724 self.info.borrow_mut().active += 1;
1725 ActiveInfo {
1726 info: self.info.clone(),
1727 }
1728 }
1729 }
1730
1731 impl Drop for ActiveInfo {
drop(&mut self)1732 fn drop(&mut self) {
1733 let mut info = self.info.borrow_mut();
1734 info.active -= 1;
1735 if info.active == 0 {
1736 if let Some(waker) = info.waker.take() {
1737 waker.wake();
1738 }
1739 }
1740 }
1741 }
1742
1743 impl std::future::Future for WaitUntilZero {
1744 type Output = io::Result<()>;
1745
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output>1746 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
1747 let mut info = self.info.borrow_mut();
1748 if info.active == 0 {
1749 std::task::Poll::Ready(Ok(()))
1750 } else {
1751 info.waker = Some(cx.waker().clone());
1752 std::task::Poll::Pending
1753 }
1754 }
1755 }
1756