1 extern crate base64;
2 #[macro_use]
3 extern crate clap;
4 extern crate crossbeam_utils;
5 extern crate env_logger;
6 extern crate flate2;
7 extern crate hyperx;
8 extern crate jsonwebtoken as jwt;
9 extern crate libmount;
10 #[macro_use]
11 extern crate log;
12 extern crate nix;
13 extern crate openssl;
14 extern crate rand;
15 extern crate reqwest;
16 extern crate sccache;
17 #[macro_use]
18 extern crate serde_derive;
19 extern crate serde_json;
20 extern crate syslog;
21 extern crate tar;
22 extern crate void;
23 
24 use anyhow::{bail, Context, Error, Result};
25 use clap::{App, Arg, ArgMatches, SubCommand};
26 use rand::{rngs::OsRng, RngCore};
27 use sccache::config::{
28     scheduler as scheduler_config, server as server_config, INSECURE_DIST_CLIENT_TOKEN,
29 };
30 use sccache::dist::{
31     self, AllocJobResult, AssignJobResult, BuilderIncoming, CompileCommand, HeartbeatServerResult,
32     InputsReader, JobAlloc, JobAuthorizer, JobComplete, JobId, JobState, RunJobResult,
33     SchedulerIncoming, SchedulerOutgoing, SchedulerStatusResult, ServerId, ServerIncoming,
34     ServerNonce, ServerOutgoing, SubmitToolchainResult, TcCache, Toolchain, ToolchainReader,
35     UpdateJobStateResult,
36 };
37 use sccache::util::daemonize;
38 use std::collections::{btree_map, BTreeMap, HashMap, HashSet};
39 use std::env;
40 use std::io;
41 use std::net::SocketAddr;
42 use std::path::Path;
43 use std::sync::atomic::{AtomicUsize, Ordering};
44 use std::sync::{Mutex, MutexGuard};
45 use std::time::{Duration, Instant};
46 use syslog::Facility;
47 
48 mod build;
49 mod token_check;
50 
51 pub const INSECURE_DIST_SERVER_TOKEN: &str = "dangerously_insecure_server";
52 
53 enum Command {
54     Auth(AuthSubcommand),
55     Scheduler(scheduler_config::Config),
56     Server(server_config::Config),
57 }
58 
59 enum AuthSubcommand {
60     Base64 {
61         num_bytes: usize,
62     },
63     JwtHS256ServerToken {
64         secret_key: String,
65         server_id: ServerId,
66     },
67 }
68 
69 // Only supported on x86_64 Linux machines
70 #[cfg(all(target_os = "linux", target_arch = "x86_64"))]
main()71 fn main() {
72     init_logging();
73     std::process::exit(match parse() {
74         Ok(cmd) => match run(cmd) {
75             Ok(s) => s,
76             Err(e) => {
77                 eprintln!("sccache-dist: error: {}", e);
78 
79                 for e in e.chain().skip(1) {
80                     eprintln!("sccache-dist: caused by: {}", e);
81                 }
82                 2
83             }
84         },
85         Err(e) => {
86             println!("sccache-dist: {}", e);
87             for e in e.chain().skip(1) {
88                 println!("sccache-dist: caused by: {}", e);
89             }
90             get_app().print_help().unwrap();
91             println!();
92             1
93         }
94     });
95 }
96 
97 /// These correspond to the values of `log::LevelFilter`.
98 const LOG_LEVELS: &[&str] = &["error", "warn", "info", "debug", "trace"];
99 
get_app<'a, 'b>() -> App<'a, 'b>100 pub fn get_app<'a, 'b>() -> App<'a, 'b> {
101     App::new(env!("CARGO_PKG_NAME"))
102         .version(env!("CARGO_PKG_VERSION"))
103         .subcommand(
104             SubCommand::with_name("auth")
105                 .subcommand(SubCommand::with_name("generate-jwt-hs256-key"))
106                 .subcommand(
107                     SubCommand::with_name("generate-jwt-hs256-server-token")
108                         .arg(Arg::from_usage(
109                             "--server <SERVER_ADDR> 'Generate a key for the specified server'",
110                         ))
111                         .arg(
112                             Arg::from_usage(
113                                 "--secret-key [KEY] 'Use specified key to create the token'",
114                             )
115                             .required_unless("config"),
116                         )
117                         .arg(
118                             Arg::from_usage(
119                                 "--config [PATH] 'Use the key from the scheduler config file'",
120                             )
121                             .required_unless("secret-key"),
122                         ),
123                 )
124                 .subcommand(
125                     SubCommand::with_name("generate-shared-token").arg(
126                         Arg::from_usage(
127                             "--bits [BITS] 'Use the specified number of bits of randomness'",
128                         )
129                         .default_value("256"),
130                     ),
131                 ),
132         )
133         .subcommand(
134             SubCommand::with_name("scheduler")
135                 .arg(Arg::from_usage(
136                     "--config <PATH> 'Use the scheduler config file at PATH'",
137                 ))
138                 .arg(
139                     Arg::from_usage("--syslog <LEVEL> 'Log to the syslog with LEVEL'")
140                         .required(false)
141                         .possible_values(LOG_LEVELS),
142                 ),
143         )
144         .subcommand(
145             SubCommand::with_name("server")
146                 .arg(Arg::from_usage(
147                     "--config <PATH> 'Use the server config file at PATH'",
148                 ))
149                 .arg(
150                     Arg::from_usage("--syslog <LEVEL> 'Log to the syslog with LEVEL'")
151                         .required(false)
152                         .possible_values(LOG_LEVELS),
153                 ),
154         )
155 }
156 
check_init_syslog<'a>(name: &str, matches: &ArgMatches<'a>)157 fn check_init_syslog<'a>(name: &str, matches: &ArgMatches<'a>) {
158     if matches.is_present("syslog") {
159         let level = value_t!(matches, "syslog", log::LevelFilter).unwrap_or_else(|e| e.exit());
160         drop(syslog::init(Facility::LOG_DAEMON, level, Some(name)));
161     }
162 }
163 
parse() -> Result<Command>164 fn parse() -> Result<Command> {
165     let matches = get_app().get_matches();
166     Ok(match matches.subcommand() {
167         ("auth", Some(matches)) => {
168             Command::Auth(match matches.subcommand() {
169                 ("generate-jwt-hs256-key", Some(_matches)) => {
170                     // Size based on https://briansmith.org/rustdoc/ring/hmac/fn.recommended_key_len.html
171                     AuthSubcommand::Base64 { num_bytes: 256 / 8 }
172                 }
173                 ("generate-jwt-hs256-server-token", Some(matches)) => {
174                     let server_id = ServerId::new(value_t_or_exit!(matches, "server", SocketAddr));
175                     let secret_key = if let Some(config_path) =
176                         matches.value_of("config").map(Path::new)
177                     {
178                         if let Some(config) = scheduler_config::from_path(config_path)? {
179                             match config.server_auth {
180                                 scheduler_config::ServerAuth::JwtHS256 { secret_key } => secret_key,
181                                 scheduler_config::ServerAuth::Insecure
182                                 | scheduler_config::ServerAuth::Token { token: _ } => {
183                                     bail!("Scheduler not configured with JWT HS256")
184                                 }
185                             }
186                         } else {
187                             bail!("Could not load config")
188                         }
189                     } else {
190                         matches
191                             .value_of("secret-key")
192                             .expect("missing secret-key in parsed subcommand")
193                             .to_owned()
194                     };
195                     AuthSubcommand::JwtHS256ServerToken {
196                         secret_key,
197                         server_id,
198                     }
199                 }
200                 ("generate-shared-token", Some(matches)) => {
201                     let bits = value_t_or_exit!(matches, "bits", usize);
202                     if bits % 8 != 0 || bits < 64 || bits > 4096 {
203                         bail!("Number of bits must be divisible by 8, greater than 64 and less than 4096")
204                     }
205                     AuthSubcommand::Base64 {
206                         num_bytes: bits / 8,
207                     }
208                 }
209                 _ => bail!("No subcommand of auth specified"),
210             })
211         }
212         ("scheduler", Some(matches)) => {
213             let config_path = Path::new(
214                 matches
215                     .value_of("config")
216                     .expect("missing config in parsed subcommand"),
217             );
218             check_init_syslog("sccache-scheduler", &matches);
219             if let Some(config) = scheduler_config::from_path(config_path)? {
220                 Command::Scheduler(config)
221             } else {
222                 bail!("Could not load config")
223             }
224         }
225         ("server", Some(matches)) => {
226             let config_path = Path::new(
227                 matches
228                     .value_of("config")
229                     .expect("missing config in parsed subcommand"),
230             );
231             check_init_syslog("sccache-buildserver", &matches);
232             if let Some(config) = server_config::from_path(config_path)? {
233                 Command::Server(config)
234             } else {
235                 bail!("Could not load config")
236             }
237         }
238         _ => bail!("No subcommand specified"),
239     })
240 }
241 
create_server_token(server_id: ServerId, auth_token: &str) -> String242 fn create_server_token(server_id: ServerId, auth_token: &str) -> String {
243     format!("{} {}", server_id.addr(), auth_token)
244 }
check_server_token(server_token: &str, auth_token: &str) -> Option<ServerId>245 fn check_server_token(server_token: &str, auth_token: &str) -> Option<ServerId> {
246     let mut split = server_token.splitn(2, |c| c == ' ');
247     let server_addr = split.next().and_then(|addr| addr.parse().ok())?;
248     match split.next() {
249         Some(t) if t == auth_token => Some(ServerId::new(server_addr)),
250         Some(_) | None => None,
251     }
252 }
253 
254 #[derive(Serialize, Deserialize)]
255 #[serde(deny_unknown_fields)]
256 struct ServerJwt {
257     server_id: ServerId,
258 }
create_jwt_server_token( server_id: ServerId, header: &jwt::Header, key: &[u8], ) -> Result<String>259 fn create_jwt_server_token(
260     server_id: ServerId,
261     header: &jwt::Header,
262     key: &[u8],
263 ) -> Result<String> {
264     let key = jwt::EncodingKey::from_secret(key);
265     jwt::encode(&header, &ServerJwt { server_id }, &key).map_err(Into::into)
266 }
dangerous_insecure_extract_jwt_server_token(server_token: &str) -> Option<ServerId>267 fn dangerous_insecure_extract_jwt_server_token(server_token: &str) -> Option<ServerId> {
268     jwt::dangerous_insecure_decode::<ServerJwt>(&server_token)
269         .map(|res| res.claims.server_id)
270         .ok()
271 }
check_jwt_server_token( server_token: &str, key: &[u8], validation: &jwt::Validation, ) -> Option<ServerId>272 fn check_jwt_server_token(
273     server_token: &str,
274     key: &[u8],
275     validation: &jwt::Validation,
276 ) -> Option<ServerId> {
277     let key = jwt::DecodingKey::from_secret(key);
278     jwt::decode::<ServerJwt>(server_token, &key, validation)
279         .map(|res| res.claims.server_id)
280         .ok()
281 }
282 
run(command: Command) -> Result<i32>283 fn run(command: Command) -> Result<i32> {
284     match command {
285         Command::Auth(AuthSubcommand::Base64 { num_bytes }) => {
286             let mut bytes = vec![0; num_bytes];
287             OsRng.fill_bytes(&mut bytes);
288             // As long as it can be copied, it doesn't matter if this is base64 or hex etc
289             println!("{}", base64::encode_config(&bytes, base64::URL_SAFE_NO_PAD));
290             Ok(0)
291         }
292         Command::Auth(AuthSubcommand::JwtHS256ServerToken {
293             secret_key,
294             server_id,
295         }) => {
296             let header = jwt::Header::new(jwt::Algorithm::HS256);
297             let secret_key = base64::decode_config(&secret_key, base64::URL_SAFE_NO_PAD)?;
298             let token = create_jwt_server_token(server_id, &header, &secret_key)
299                 .context("Failed to create server token")?;
300             println!("{}", token);
301             Ok(0)
302         }
303 
304         Command::Scheduler(scheduler_config::Config {
305             public_addr,
306             client_auth,
307             server_auth,
308         }) => {
309             let check_client_auth: Box<dyn dist::http::ClientAuthCheck> = match client_auth {
310                 scheduler_config::ClientAuth::Insecure => Box::new(token_check::EqCheck::new(
311                     INSECURE_DIST_CLIENT_TOKEN.to_owned(),
312                 )),
313                 scheduler_config::ClientAuth::Token { token } => {
314                     Box::new(token_check::EqCheck::new(token))
315                 }
316                 scheduler_config::ClientAuth::JwtValidate {
317                     audience,
318                     issuer,
319                     jwks_url,
320                 } => Box::new(
321                     token_check::ValidJWTCheck::new(audience, issuer, &jwks_url)
322                         .context("Failed to create a checker for valid JWTs")?,
323                 ),
324                 scheduler_config::ClientAuth::Mozilla { required_groups } => {
325                     Box::new(token_check::MozillaCheck::new(required_groups))
326                 }
327                 scheduler_config::ClientAuth::ProxyToken { url, cache_secs } => {
328                     Box::new(token_check::ProxyTokenCheck::new(url, cache_secs))
329                 }
330             };
331 
332             let check_server_auth: dist::http::ServerAuthCheck = match server_auth {
333                 scheduler_config::ServerAuth::Insecure => {
334                     warn!("Scheduler starting with DANGEROUSLY_INSECURE server authentication");
335                     let token = INSECURE_DIST_SERVER_TOKEN;
336                     Box::new(move |server_token| check_server_token(server_token, &token))
337                 }
338                 scheduler_config::ServerAuth::Token { token } => {
339                     Box::new(move |server_token| check_server_token(server_token, &token))
340                 }
341                 scheduler_config::ServerAuth::JwtHS256 { secret_key } => {
342                     let secret_key = base64::decode_config(&secret_key, base64::URL_SAFE_NO_PAD)
343                         .context("Secret key base64 invalid")?;
344                     if secret_key.len() != 256 / 8 {
345                         bail!("Size of secret key incorrect")
346                     }
347                     let validation = jwt::Validation {
348                         leeway: 0,
349                         validate_exp: false,
350                         validate_nbf: false,
351                         aud: None,
352                         iss: None,
353                         sub: None,
354                         algorithms: vec![jwt::Algorithm::HS256],
355                     };
356                     Box::new(move |server_token| {
357                         check_jwt_server_token(server_token, &secret_key, &validation)
358                     })
359                 }
360             };
361 
362             daemonize()?;
363             let scheduler = Scheduler::new();
364             let http_scheduler = dist::http::Scheduler::new(
365                 public_addr,
366                 scheduler,
367                 check_client_auth,
368                 check_server_auth,
369             );
370             void::unreachable(http_scheduler.start()?);
371         }
372 
373         Command::Server(server_config::Config {
374             builder,
375             cache_dir,
376             public_addr,
377             scheduler_url,
378             scheduler_auth,
379             toolchain_cache_size,
380         }) => {
381             let builder: Box<dyn dist::BuilderIncoming> = match builder {
382                 server_config::BuilderType::Docker => {
383                     Box::new(build::DockerBuilder::new().context("Docker builder failed to start")?)
384                 }
385                 server_config::BuilderType::Overlay {
386                     bwrap_path,
387                     build_dir,
388                 } => Box::new(
389                     build::OverlayBuilder::new(bwrap_path, build_dir)
390                         .context("Overlay builder failed to start")?,
391                 ),
392             };
393 
394             let server_id = ServerId::new(public_addr);
395             let scheduler_auth = match scheduler_auth {
396                 server_config::SchedulerAuth::Insecure => {
397                     warn!("Server starting with DANGEROUSLY_INSECURE scheduler authentication");
398                     create_server_token(server_id, &INSECURE_DIST_SERVER_TOKEN)
399                 }
400                 server_config::SchedulerAuth::Token { token } => {
401                     create_server_token(server_id, &token)
402                 }
403                 server_config::SchedulerAuth::JwtToken { token } => {
404                     let token_server_id: ServerId =
405                         dangerous_insecure_extract_jwt_server_token(&token)
406                             .context("Could not decode scheduler auth jwt")?;
407                     if token_server_id != server_id {
408                         bail!(
409                             "JWT server id ({:?}) did not match configured server id ({:?})",
410                             token_server_id,
411                             server_id
412                         )
413                     }
414                     token
415                 }
416             };
417 
418             let server = Server::new(builder, &cache_dir, toolchain_cache_size)
419                 .context("Failed to create sccache server instance")?;
420             let http_server = dist::http::Server::new(
421                 public_addr,
422                 scheduler_url.to_url(),
423                 scheduler_auth,
424                 server,
425             )
426             .context("Failed to create sccache HTTP server instance")?;
427             void::unreachable(http_server.start()?)
428         }
429     }
430 }
431 
init_logging()432 fn init_logging() {
433     if env::var("RUST_LOG").is_ok() {
434         match env_logger::try_init() {
435             Ok(_) => (),
436             Err(e) => panic!(format!("Failed to initalize logging: {:?}", e)),
437         }
438     }
439 }
440 
441 const MAX_PER_CORE_LOAD: f64 = 10f64;
442 const SERVER_REMEMBER_ERROR_TIMEOUT: Duration = Duration::from_secs(300);
443 const UNCLAIMED_PENDING_TIMEOUT: Duration = Duration::from_secs(300);
444 const UNCLAIMED_READY_TIMEOUT: Duration = Duration::from_secs(60);
445 
446 #[derive(Copy, Clone)]
447 struct JobDetail {
448     server_id: ServerId,
449     state: JobState,
450 }
451 
452 // To avoid deadlicking, make sure to do all locking at once (i.e. no further locking in a downward scope),
453 // in alphabetical order
454 pub struct Scheduler {
455     job_count: AtomicUsize,
456 
457     // Currently running jobs, can never be Complete
458     jobs: Mutex<BTreeMap<JobId, JobDetail>>,
459 
460     servers: Mutex<HashMap<ServerId, ServerDetails>>,
461 }
462 
463 struct ServerDetails {
464     jobs_assigned: HashSet<JobId>,
465     // Jobs assigned that haven't seen a state change. Can only be pending
466     // or ready.
467     jobs_unclaimed: HashMap<JobId, Instant>,
468     last_seen: Instant,
469     last_error: Option<Instant>,
470     num_cpus: usize,
471     server_nonce: ServerNonce,
472     job_authorizer: Box<dyn JobAuthorizer>,
473 }
474 
475 impl Scheduler {
new() -> Self476     pub fn new() -> Self {
477         Scheduler {
478             job_count: AtomicUsize::new(0),
479             jobs: Mutex::new(BTreeMap::new()),
480             servers: Mutex::new(HashMap::new()),
481         }
482     }
483 
prune_servers( &self, servers: &mut MutexGuard<HashMap<ServerId, ServerDetails>>, jobs: &mut MutexGuard<BTreeMap<JobId, JobDetail>>, )484     fn prune_servers(
485         &self,
486         servers: &mut MutexGuard<HashMap<ServerId, ServerDetails>>,
487         jobs: &mut MutexGuard<BTreeMap<JobId, JobDetail>>,
488     ) {
489         let now = Instant::now();
490 
491         let mut dead_servers = Vec::new();
492 
493         for (&server_id, details) in servers.iter_mut() {
494             if now.duration_since(details.last_seen) > dist::http::HEARTBEAT_TIMEOUT {
495                 dead_servers.push(server_id);
496             }
497         }
498 
499         for server_id in dead_servers {
500             warn!(
501                 "Server {} appears to be dead, pruning it in the scheduler",
502                 server_id.addr()
503             );
504             let server_details = servers
505                 .remove(&server_id)
506                 .expect("server went missing from map");
507             for job_id in server_details.jobs_assigned {
508                 warn!(
509                     "Non-terminated job {} was cleaned up in server pruning",
510                     job_id
511                 );
512                 // A job may be missing here if it failed to allocate
513                 // initially, so just warn if it's not present.
514                 if jobs.remove(&job_id).is_none() {
515                     warn!(
516                         "Non-terminated job {} assignment originally failed.",
517                         job_id
518                     );
519                 }
520             }
521         }
522     }
523 }
524 
525 impl SchedulerIncoming for Scheduler {
handle_alloc_job( &self, requester: &dyn SchedulerOutgoing, tc: Toolchain, ) -> Result<AllocJobResult>526     fn handle_alloc_job(
527         &self,
528         requester: &dyn SchedulerOutgoing,
529         tc: Toolchain,
530     ) -> Result<AllocJobResult> {
531         let (job_id, server_id, auth) = {
532             // LOCKS
533             let mut servers = self.servers.lock().unwrap();
534 
535             let res = {
536                 let mut best = None;
537                 let mut best_err = None;
538                 let mut best_load: f64 = MAX_PER_CORE_LOAD;
539                 let now = Instant::now();
540                 for (&server_id, details) in servers.iter_mut() {
541                     let load = details.jobs_assigned.len() as f64 / details.num_cpus as f64;
542 
543                     if let Some(last_error) = details.last_error {
544                         if load < MAX_PER_CORE_LOAD {
545                             if now.duration_since(last_error) > SERVER_REMEMBER_ERROR_TIMEOUT {
546                                 details.last_error = None;
547                             }
548                             match best_err {
549                                 Some((
550                                     _,
551                                     &mut ServerDetails {
552                                         last_error: Some(best_last_err),
553                                         ..
554                                     },
555                                 )) => {
556                                     if last_error < best_last_err {
557                                         trace!(
558                                             "Selected {:?}, its most recent error is {:?} ago",
559                                             server_id,
560                                             now - last_error
561                                         );
562                                         best_err = Some((server_id, details));
563                                     }
564                                 }
565                                 _ => {
566                                     trace!(
567                                         "Selected {:?}, its most recent error is {:?} ago",
568                                         server_id,
569                                         now - last_error
570                                     );
571                                     best_err = Some((server_id, details));
572                                 }
573                             }
574                         }
575                     } else if load < best_load {
576                         best = Some((server_id, details));
577                         trace!("Selected {:?} as the server with the best load", server_id);
578                         best_load = load;
579                         if load == 0f64 {
580                             break;
581                         }
582                     }
583                 }
584 
585                 // Assign the job to our best choice
586                 if let Some((server_id, server_details)) = best.or(best_err) {
587                     let job_count = self.job_count.fetch_add(1, Ordering::SeqCst) as u64;
588                     let job_id = JobId(job_count);
589                     assert!(server_details.jobs_assigned.insert(job_id));
590                     assert!(server_details
591                         .jobs_unclaimed
592                         .insert(job_id, Instant::now())
593                         .is_none());
594 
595                     info!(
596                         "Job {} created and will be assigned to server {:?}",
597                         job_id, server_id
598                     );
599                     let auth = server_details
600                         .job_authorizer
601                         .generate_token(job_id)
602                         .map_err(Error::from)
603                         .context("Could not create an auth token for this job")?;
604                     Some((job_id, server_id, auth))
605                 } else {
606                     None
607                 }
608             };
609 
610             if let Some(res) = res {
611                 res
612             } else {
613                 let msg = format!(
614                     "Insufficient capacity across {} available servers",
615                     servers.len()
616                 );
617                 return Ok(AllocJobResult::Fail { msg });
618             }
619         };
620         let AssignJobResult {
621             state,
622             need_toolchain,
623         } = requester
624             .do_assign_job(server_id, job_id, tc, auth.clone())
625             .with_context(|| {
626                 // LOCKS
627                 let mut servers = self.servers.lock().unwrap();
628                 if let Some(entry) = servers.get_mut(&server_id) {
629                     entry.last_error = Some(Instant::now());
630                     entry.jobs_unclaimed.remove(&job_id);
631                     if !entry.jobs_assigned.remove(&job_id) {
632                         "assign job failed and job not known to the server"
633                     } else {
634                         "assign job failed, job un-assigned from the server"
635                     }
636                 } else {
637                     "assign job failed and server not known"
638                 }
639             })?;
640         {
641             // LOCKS
642             let mut jobs = self.jobs.lock().unwrap();
643 
644             info!(
645                 "Job {} successfully assigned and saved with state {:?}",
646                 job_id, state
647             );
648             assert!(jobs
649                 .insert(job_id, JobDetail { server_id, state })
650                 .is_none());
651         }
652         let job_alloc = JobAlloc {
653             auth,
654             job_id,
655             server_id,
656         };
657         Ok(AllocJobResult::Success {
658             job_alloc,
659             need_toolchain,
660         })
661     }
662 
handle_heartbeat_server( &self, server_id: ServerId, server_nonce: ServerNonce, num_cpus: usize, job_authorizer: Box<dyn JobAuthorizer>, ) -> Result<HeartbeatServerResult>663     fn handle_heartbeat_server(
664         &self,
665         server_id: ServerId,
666         server_nonce: ServerNonce,
667         num_cpus: usize,
668         job_authorizer: Box<dyn JobAuthorizer>,
669     ) -> Result<HeartbeatServerResult> {
670         if num_cpus == 0 {
671             bail!("Invalid number of CPUs (0) specified in heartbeat")
672         }
673 
674         // LOCKS
675         let mut jobs = self.jobs.lock().unwrap();
676         let mut servers = self.servers.lock().unwrap();
677 
678         self.prune_servers(&mut servers, &mut jobs);
679 
680         match servers.get_mut(&server_id) {
681             Some(ref mut details) if details.server_nonce == server_nonce => {
682                 let now = Instant::now();
683                 details.last_seen = now;
684 
685                 let mut stale_jobs = Vec::new();
686                 for (&job_id, &last_seen) in details.jobs_unclaimed.iter() {
687                     if now.duration_since(last_seen) < UNCLAIMED_READY_TIMEOUT {
688                         continue;
689                     }
690                     if let Some(detail) = jobs.get(&job_id) {
691                         match detail.state {
692                             JobState::Ready => {
693                                 stale_jobs.push(job_id);
694                             }
695                             JobState::Pending => {
696                                 if now.duration_since(last_seen) > UNCLAIMED_PENDING_TIMEOUT {
697                                     stale_jobs.push(job_id);
698                                 }
699                             }
700                             state => {
701                                 warn!("Invalid unclaimed job state for {}: {}", job_id, state);
702                             }
703                         }
704                     } else {
705                         warn!("Unknown stale job {}", job_id);
706                         stale_jobs.push(job_id);
707                     }
708                 }
709 
710                 if !stale_jobs.is_empty() {
711                     warn!(
712                         "The following stale jobs will be de-allocated: {:?}",
713                         stale_jobs
714                     );
715 
716                     for job_id in stale_jobs {
717                         if !details.jobs_assigned.remove(&job_id) {
718                             warn!(
719                                 "Stale job for server {} not assigned: {}",
720                                 server_id.addr(),
721                                 job_id
722                             );
723                         }
724                         if details.jobs_unclaimed.remove(&job_id).is_none() {
725                             warn!(
726                                 "Unknown stale job for server {}: {}",
727                                 server_id.addr(),
728                                 job_id
729                             );
730                         }
731                         if jobs.remove(&job_id).is_none() {
732                             warn!(
733                                 "Unknown stale job for server {}: {}",
734                                 server_id.addr(),
735                                 job_id
736                             );
737                         }
738                     }
739                 }
740 
741                 return Ok(HeartbeatServerResult { is_new: false });
742             }
743             Some(ref mut details) if details.server_nonce != server_nonce => {
744                 for job_id in details.jobs_assigned.iter() {
745                     if jobs.remove(&job_id).is_none() {
746                         warn!(
747                             "Unknown job found when replacing server {}: {}",
748                             server_id.addr(),
749                             job_id
750                         );
751                     }
752                 }
753             }
754             _ => (),
755         }
756         info!("Registered new server {:?}", server_id);
757         servers.insert(
758             server_id,
759             ServerDetails {
760                 last_seen: Instant::now(),
761                 last_error: None,
762                 jobs_assigned: HashSet::new(),
763                 jobs_unclaimed: HashMap::new(),
764                 num_cpus,
765                 server_nonce,
766                 job_authorizer,
767             },
768         );
769         Ok(HeartbeatServerResult { is_new: true })
770     }
771 
handle_update_job_state( &self, job_id: JobId, server_id: ServerId, job_state: JobState, ) -> Result<UpdateJobStateResult>772     fn handle_update_job_state(
773         &self,
774         job_id: JobId,
775         server_id: ServerId,
776         job_state: JobState,
777     ) -> Result<UpdateJobStateResult> {
778         // LOCKS
779         let mut jobs = self.jobs.lock().unwrap();
780         let mut servers = self.servers.lock().unwrap();
781 
782         if let btree_map::Entry::Occupied(mut entry) = jobs.entry(job_id) {
783             let job_detail = entry.get();
784             if job_detail.server_id != server_id {
785                 bail!(
786                     "Job id {} is not registed on server {:?}",
787                     job_id,
788                     server_id
789                 )
790             }
791 
792             let now = Instant::now();
793             let mut server_details = servers.get_mut(&server_id);
794             if let Some(ref mut details) = server_details {
795                 details.last_seen = now;
796             };
797 
798             match (job_detail.state, job_state) {
799                 (JobState::Pending, JobState::Ready) => entry.get_mut().state = job_state,
800                 (JobState::Ready, JobState::Started) => {
801                     if let Some(details) = server_details {
802                         details.jobs_unclaimed.remove(&job_id);
803                     } else {
804                         warn!("Job state updated, but server is not known to scheduler")
805                     }
806                     entry.get_mut().state = job_state
807                 }
808                 (JobState::Started, JobState::Complete) => {
809                     let (job_id, _) = entry.remove_entry();
810                     if let Some(entry) = server_details {
811                         assert!(entry.jobs_assigned.remove(&job_id))
812                     } else {
813                         bail!("Job was marked as finished, but server is not known to scheduler")
814                     }
815                 }
816                 (from, to) => bail!("Invalid job state transition from {} to {}", from, to),
817             }
818             info!("Job {} updated state to {:?}", job_id, job_state);
819         } else {
820             bail!("Unknown job")
821         }
822         Ok(UpdateJobStateResult::Success)
823     }
824 
handle_status(&self) -> Result<SchedulerStatusResult>825     fn handle_status(&self) -> Result<SchedulerStatusResult> {
826         // LOCKS
827         let mut jobs = self.jobs.lock().unwrap();
828         let mut servers = self.servers.lock().unwrap();
829 
830         self.prune_servers(&mut servers, &mut jobs);
831 
832         Ok(SchedulerStatusResult {
833             num_servers: servers.len(),
834             num_cpus: servers.values().map(|v| v.num_cpus).sum(),
835             in_progress: jobs.len(),
836         })
837     }
838 }
839 
840 pub struct Server {
841     builder: Box<dyn BuilderIncoming>,
842     cache: Mutex<TcCache>,
843     job_toolchains: Mutex<HashMap<JobId, Toolchain>>,
844 }
845 
846 impl Server {
new( builder: Box<dyn BuilderIncoming>, cache_dir: &Path, toolchain_cache_size: u64, ) -> Result<Server>847     pub fn new(
848         builder: Box<dyn BuilderIncoming>,
849         cache_dir: &Path,
850         toolchain_cache_size: u64,
851     ) -> Result<Server> {
852         let cache = TcCache::new(&cache_dir.join("tc"), toolchain_cache_size)
853             .context("Failed to create toolchain cache")?;
854         Ok(Server {
855             builder,
856             cache: Mutex::new(cache),
857             job_toolchains: Mutex::new(HashMap::new()),
858         })
859     }
860 }
861 
862 impl ServerIncoming for Server {
handle_assign_job(&self, job_id: JobId, tc: Toolchain) -> Result<AssignJobResult>863     fn handle_assign_job(&self, job_id: JobId, tc: Toolchain) -> Result<AssignJobResult> {
864         let need_toolchain = !self.cache.lock().unwrap().contains_toolchain(&tc);
865         assert!(self
866             .job_toolchains
867             .lock()
868             .unwrap()
869             .insert(job_id, tc)
870             .is_none());
871         let state = if need_toolchain {
872             JobState::Pending
873         } else {
874             // TODO: can start prepping the build environment now
875             JobState::Ready
876         };
877         Ok(AssignJobResult {
878             state,
879             need_toolchain,
880         })
881     }
handle_submit_toolchain( &self, requester: &dyn ServerOutgoing, job_id: JobId, tc_rdr: ToolchainReader, ) -> Result<SubmitToolchainResult>882     fn handle_submit_toolchain(
883         &self,
884         requester: &dyn ServerOutgoing,
885         job_id: JobId,
886         tc_rdr: ToolchainReader,
887     ) -> Result<SubmitToolchainResult> {
888         requester
889             .do_update_job_state(job_id, JobState::Ready)
890             .context("Updating job state failed")?;
891         // TODO: need to lock the toolchain until the container has started
892         // TODO: can start prepping container
893         let tc = match self.job_toolchains.lock().unwrap().get(&job_id).cloned() {
894             Some(tc) => tc,
895             None => return Ok(SubmitToolchainResult::JobNotFound),
896         };
897         let mut cache = self.cache.lock().unwrap();
898         // TODO: this returns before reading all the data, is that valid?
899         if cache.contains_toolchain(&tc) {
900             return Ok(SubmitToolchainResult::Success);
901         }
902         Ok(cache
903             .insert_with(&tc, |mut file| {
904                 io::copy(&mut { tc_rdr }, &mut file).map(|_| ())
905             })
906             .map(|_| SubmitToolchainResult::Success)
907             .unwrap_or(SubmitToolchainResult::CannotCache))
908     }
handle_run_job( &self, requester: &dyn ServerOutgoing, job_id: JobId, command: CompileCommand, outputs: Vec<String>, inputs_rdr: InputsReader, ) -> Result<RunJobResult>909     fn handle_run_job(
910         &self,
911         requester: &dyn ServerOutgoing,
912         job_id: JobId,
913         command: CompileCommand,
914         outputs: Vec<String>,
915         inputs_rdr: InputsReader,
916     ) -> Result<RunJobResult> {
917         requester
918             .do_update_job_state(job_id, JobState::Started)
919             .context("Updating job state failed")?;
920         let tc = self.job_toolchains.lock().unwrap().remove(&job_id);
921         let res = match tc {
922             None => Ok(RunJobResult::JobNotFound),
923             Some(tc) => {
924                 match self
925                     .builder
926                     .run_build(tc, command, outputs, inputs_rdr, &self.cache)
927                 {
928                     Err(e) => Err(e.context("run build failed")),
929                     Ok(res) => Ok(RunJobResult::Complete(JobComplete {
930                         output: res.output,
931                         outputs: res.outputs,
932                     })),
933                 }
934             }
935         };
936         requester
937             .do_update_job_state(job_id, JobState::Complete)
938             .context("Updating job state failed")?;
939         res
940     }
941 }
942