1 extern crate base64;
2 #[macro_use]
3 extern crate clap;
4 extern crate crossbeam_utils;
5 extern crate env_logger;
6 extern crate flate2;
7 extern crate hyperx;
8 extern crate jsonwebtoken as jwt;
9 extern crate libmount;
10 #[macro_use]
11 extern crate log;
12 extern crate nix;
13 extern crate openssl;
14 extern crate rand;
15 extern crate reqwest;
16 extern crate sccache;
17 #[macro_use]
18 extern crate serde_derive;
19 extern crate serde_json;
20 extern crate syslog;
21 extern crate tar;
22 extern crate void;
23
24 use anyhow::{bail, Context, Error, Result};
25 use clap::{App, Arg, ArgMatches, SubCommand};
26 use rand::{rngs::OsRng, RngCore};
27 use sccache::config::{
28 scheduler as scheduler_config, server as server_config, INSECURE_DIST_CLIENT_TOKEN,
29 };
30 use sccache::dist::{
31 self, AllocJobResult, AssignJobResult, BuilderIncoming, CompileCommand, HeartbeatServerResult,
32 InputsReader, JobAlloc, JobAuthorizer, JobComplete, JobId, JobState, RunJobResult,
33 SchedulerIncoming, SchedulerOutgoing, SchedulerStatusResult, ServerId, ServerIncoming,
34 ServerNonce, ServerOutgoing, SubmitToolchainResult, TcCache, Toolchain, ToolchainReader,
35 UpdateJobStateResult,
36 };
37 use sccache::util::daemonize;
38 use std::collections::{btree_map, BTreeMap, HashMap, HashSet};
39 use std::env;
40 use std::io;
41 use std::net::SocketAddr;
42 use std::path::Path;
43 use std::sync::atomic::{AtomicUsize, Ordering};
44 use std::sync::{Mutex, MutexGuard};
45 use std::time::{Duration, Instant};
46 use syslog::Facility;
47
48 mod build;
49 mod token_check;
50
51 pub const INSECURE_DIST_SERVER_TOKEN: &str = "dangerously_insecure_server";
52
53 enum Command {
54 Auth(AuthSubcommand),
55 Scheduler(scheduler_config::Config),
56 Server(server_config::Config),
57 }
58
59 enum AuthSubcommand {
60 Base64 {
61 num_bytes: usize,
62 },
63 JwtHS256ServerToken {
64 secret_key: String,
65 server_id: ServerId,
66 },
67 }
68
69 // Only supported on x86_64 Linux machines
70 #[cfg(all(target_os = "linux", target_arch = "x86_64"))]
main()71 fn main() {
72 init_logging();
73 std::process::exit(match parse() {
74 Ok(cmd) => match run(cmd) {
75 Ok(s) => s,
76 Err(e) => {
77 eprintln!("sccache-dist: error: {}", e);
78
79 for e in e.chain().skip(1) {
80 eprintln!("sccache-dist: caused by: {}", e);
81 }
82 2
83 }
84 },
85 Err(e) => {
86 println!("sccache-dist: {}", e);
87 for e in e.chain().skip(1) {
88 println!("sccache-dist: caused by: {}", e);
89 }
90 get_app().print_help().unwrap();
91 println!();
92 1
93 }
94 });
95 }
96
97 /// These correspond to the values of `log::LevelFilter`.
98 const LOG_LEVELS: &[&str] = &["error", "warn", "info", "debug", "trace"];
99
get_app<'a, 'b>() -> App<'a, 'b>100 pub fn get_app<'a, 'b>() -> App<'a, 'b> {
101 App::new(env!("CARGO_PKG_NAME"))
102 .version(env!("CARGO_PKG_VERSION"))
103 .subcommand(
104 SubCommand::with_name("auth")
105 .subcommand(SubCommand::with_name("generate-jwt-hs256-key"))
106 .subcommand(
107 SubCommand::with_name("generate-jwt-hs256-server-token")
108 .arg(Arg::from_usage(
109 "--server <SERVER_ADDR> 'Generate a key for the specified server'",
110 ))
111 .arg(
112 Arg::from_usage(
113 "--secret-key [KEY] 'Use specified key to create the token'",
114 )
115 .required_unless("config"),
116 )
117 .arg(
118 Arg::from_usage(
119 "--config [PATH] 'Use the key from the scheduler config file'",
120 )
121 .required_unless("secret-key"),
122 ),
123 )
124 .subcommand(
125 SubCommand::with_name("generate-shared-token").arg(
126 Arg::from_usage(
127 "--bits [BITS] 'Use the specified number of bits of randomness'",
128 )
129 .default_value("256"),
130 ),
131 ),
132 )
133 .subcommand(
134 SubCommand::with_name("scheduler")
135 .arg(Arg::from_usage(
136 "--config <PATH> 'Use the scheduler config file at PATH'",
137 ))
138 .arg(
139 Arg::from_usage("--syslog <LEVEL> 'Log to the syslog with LEVEL'")
140 .required(false)
141 .possible_values(LOG_LEVELS),
142 ),
143 )
144 .subcommand(
145 SubCommand::with_name("server")
146 .arg(Arg::from_usage(
147 "--config <PATH> 'Use the server config file at PATH'",
148 ))
149 .arg(
150 Arg::from_usage("--syslog <LEVEL> 'Log to the syslog with LEVEL'")
151 .required(false)
152 .possible_values(LOG_LEVELS),
153 ),
154 )
155 }
156
check_init_syslog<'a>(name: &str, matches: &ArgMatches<'a>)157 fn check_init_syslog<'a>(name: &str, matches: &ArgMatches<'a>) {
158 if matches.is_present("syslog") {
159 let level = value_t!(matches, "syslog", log::LevelFilter).unwrap_or_else(|e| e.exit());
160 drop(syslog::init(Facility::LOG_DAEMON, level, Some(name)));
161 }
162 }
163
parse() -> Result<Command>164 fn parse() -> Result<Command> {
165 let matches = get_app().get_matches();
166 Ok(match matches.subcommand() {
167 ("auth", Some(matches)) => {
168 Command::Auth(match matches.subcommand() {
169 ("generate-jwt-hs256-key", Some(_matches)) => {
170 // Size based on https://briansmith.org/rustdoc/ring/hmac/fn.recommended_key_len.html
171 AuthSubcommand::Base64 { num_bytes: 256 / 8 }
172 }
173 ("generate-jwt-hs256-server-token", Some(matches)) => {
174 let server_id = ServerId::new(value_t_or_exit!(matches, "server", SocketAddr));
175 let secret_key = if let Some(config_path) =
176 matches.value_of("config").map(Path::new)
177 {
178 if let Some(config) = scheduler_config::from_path(config_path)? {
179 match config.server_auth {
180 scheduler_config::ServerAuth::JwtHS256 { secret_key } => secret_key,
181 scheduler_config::ServerAuth::Insecure
182 | scheduler_config::ServerAuth::Token { token: _ } => {
183 bail!("Scheduler not configured with JWT HS256")
184 }
185 }
186 } else {
187 bail!("Could not load config")
188 }
189 } else {
190 matches
191 .value_of("secret-key")
192 .expect("missing secret-key in parsed subcommand")
193 .to_owned()
194 };
195 AuthSubcommand::JwtHS256ServerToken {
196 secret_key,
197 server_id,
198 }
199 }
200 ("generate-shared-token", Some(matches)) => {
201 let bits = value_t_or_exit!(matches, "bits", usize);
202 if bits % 8 != 0 || bits < 64 || bits > 4096 {
203 bail!("Number of bits must be divisible by 8, greater than 64 and less than 4096")
204 }
205 AuthSubcommand::Base64 {
206 num_bytes: bits / 8,
207 }
208 }
209 _ => bail!("No subcommand of auth specified"),
210 })
211 }
212 ("scheduler", Some(matches)) => {
213 let config_path = Path::new(
214 matches
215 .value_of("config")
216 .expect("missing config in parsed subcommand"),
217 );
218 check_init_syslog("sccache-scheduler", &matches);
219 if let Some(config) = scheduler_config::from_path(config_path)? {
220 Command::Scheduler(config)
221 } else {
222 bail!("Could not load config")
223 }
224 }
225 ("server", Some(matches)) => {
226 let config_path = Path::new(
227 matches
228 .value_of("config")
229 .expect("missing config in parsed subcommand"),
230 );
231 check_init_syslog("sccache-buildserver", &matches);
232 if let Some(config) = server_config::from_path(config_path)? {
233 Command::Server(config)
234 } else {
235 bail!("Could not load config")
236 }
237 }
238 _ => bail!("No subcommand specified"),
239 })
240 }
241
create_server_token(server_id: ServerId, auth_token: &str) -> String242 fn create_server_token(server_id: ServerId, auth_token: &str) -> String {
243 format!("{} {}", server_id.addr(), auth_token)
244 }
check_server_token(server_token: &str, auth_token: &str) -> Option<ServerId>245 fn check_server_token(server_token: &str, auth_token: &str) -> Option<ServerId> {
246 let mut split = server_token.splitn(2, |c| c == ' ');
247 let server_addr = split.next().and_then(|addr| addr.parse().ok())?;
248 match split.next() {
249 Some(t) if t == auth_token => Some(ServerId::new(server_addr)),
250 Some(_) | None => None,
251 }
252 }
253
254 #[derive(Serialize, Deserialize)]
255 #[serde(deny_unknown_fields)]
256 struct ServerJwt {
257 server_id: ServerId,
258 }
create_jwt_server_token( server_id: ServerId, header: &jwt::Header, key: &[u8], ) -> Result<String>259 fn create_jwt_server_token(
260 server_id: ServerId,
261 header: &jwt::Header,
262 key: &[u8],
263 ) -> Result<String> {
264 let key = jwt::EncodingKey::from_secret(key);
265 jwt::encode(&header, &ServerJwt { server_id }, &key).map_err(Into::into)
266 }
dangerous_insecure_extract_jwt_server_token(server_token: &str) -> Option<ServerId>267 fn dangerous_insecure_extract_jwt_server_token(server_token: &str) -> Option<ServerId> {
268 jwt::dangerous_insecure_decode::<ServerJwt>(&server_token)
269 .map(|res| res.claims.server_id)
270 .ok()
271 }
check_jwt_server_token( server_token: &str, key: &[u8], validation: &jwt::Validation, ) -> Option<ServerId>272 fn check_jwt_server_token(
273 server_token: &str,
274 key: &[u8],
275 validation: &jwt::Validation,
276 ) -> Option<ServerId> {
277 let key = jwt::DecodingKey::from_secret(key);
278 jwt::decode::<ServerJwt>(server_token, &key, validation)
279 .map(|res| res.claims.server_id)
280 .ok()
281 }
282
run(command: Command) -> Result<i32>283 fn run(command: Command) -> Result<i32> {
284 match command {
285 Command::Auth(AuthSubcommand::Base64 { num_bytes }) => {
286 let mut bytes = vec![0; num_bytes];
287 OsRng.fill_bytes(&mut bytes);
288 // As long as it can be copied, it doesn't matter if this is base64 or hex etc
289 println!("{}", base64::encode_config(&bytes, base64::URL_SAFE_NO_PAD));
290 Ok(0)
291 }
292 Command::Auth(AuthSubcommand::JwtHS256ServerToken {
293 secret_key,
294 server_id,
295 }) => {
296 let header = jwt::Header::new(jwt::Algorithm::HS256);
297 let secret_key = base64::decode_config(&secret_key, base64::URL_SAFE_NO_PAD)?;
298 let token = create_jwt_server_token(server_id, &header, &secret_key)
299 .context("Failed to create server token")?;
300 println!("{}", token);
301 Ok(0)
302 }
303
304 Command::Scheduler(scheduler_config::Config {
305 public_addr,
306 client_auth,
307 server_auth,
308 }) => {
309 let check_client_auth: Box<dyn dist::http::ClientAuthCheck> = match client_auth {
310 scheduler_config::ClientAuth::Insecure => Box::new(token_check::EqCheck::new(
311 INSECURE_DIST_CLIENT_TOKEN.to_owned(),
312 )),
313 scheduler_config::ClientAuth::Token { token } => {
314 Box::new(token_check::EqCheck::new(token))
315 }
316 scheduler_config::ClientAuth::JwtValidate {
317 audience,
318 issuer,
319 jwks_url,
320 } => Box::new(
321 token_check::ValidJWTCheck::new(audience, issuer, &jwks_url)
322 .context("Failed to create a checker for valid JWTs")?,
323 ),
324 scheduler_config::ClientAuth::Mozilla { required_groups } => {
325 Box::new(token_check::MozillaCheck::new(required_groups))
326 }
327 scheduler_config::ClientAuth::ProxyToken { url, cache_secs } => {
328 Box::new(token_check::ProxyTokenCheck::new(url, cache_secs))
329 }
330 };
331
332 let check_server_auth: dist::http::ServerAuthCheck = match server_auth {
333 scheduler_config::ServerAuth::Insecure => {
334 warn!("Scheduler starting with DANGEROUSLY_INSECURE server authentication");
335 let token = INSECURE_DIST_SERVER_TOKEN;
336 Box::new(move |server_token| check_server_token(server_token, &token))
337 }
338 scheduler_config::ServerAuth::Token { token } => {
339 Box::new(move |server_token| check_server_token(server_token, &token))
340 }
341 scheduler_config::ServerAuth::JwtHS256 { secret_key } => {
342 let secret_key = base64::decode_config(&secret_key, base64::URL_SAFE_NO_PAD)
343 .context("Secret key base64 invalid")?;
344 if secret_key.len() != 256 / 8 {
345 bail!("Size of secret key incorrect")
346 }
347 let validation = jwt::Validation {
348 leeway: 0,
349 validate_exp: false,
350 validate_nbf: false,
351 aud: None,
352 iss: None,
353 sub: None,
354 algorithms: vec![jwt::Algorithm::HS256],
355 };
356 Box::new(move |server_token| {
357 check_jwt_server_token(server_token, &secret_key, &validation)
358 })
359 }
360 };
361
362 daemonize()?;
363 let scheduler = Scheduler::new();
364 let http_scheduler = dist::http::Scheduler::new(
365 public_addr,
366 scheduler,
367 check_client_auth,
368 check_server_auth,
369 );
370 void::unreachable(http_scheduler.start()?);
371 }
372
373 Command::Server(server_config::Config {
374 builder,
375 cache_dir,
376 public_addr,
377 scheduler_url,
378 scheduler_auth,
379 toolchain_cache_size,
380 }) => {
381 let builder: Box<dyn dist::BuilderIncoming> = match builder {
382 server_config::BuilderType::Docker => {
383 Box::new(build::DockerBuilder::new().context("Docker builder failed to start")?)
384 }
385 server_config::BuilderType::Overlay {
386 bwrap_path,
387 build_dir,
388 } => Box::new(
389 build::OverlayBuilder::new(bwrap_path, build_dir)
390 .context("Overlay builder failed to start")?,
391 ),
392 };
393
394 let server_id = ServerId::new(public_addr);
395 let scheduler_auth = match scheduler_auth {
396 server_config::SchedulerAuth::Insecure => {
397 warn!("Server starting with DANGEROUSLY_INSECURE scheduler authentication");
398 create_server_token(server_id, &INSECURE_DIST_SERVER_TOKEN)
399 }
400 server_config::SchedulerAuth::Token { token } => {
401 create_server_token(server_id, &token)
402 }
403 server_config::SchedulerAuth::JwtToken { token } => {
404 let token_server_id: ServerId =
405 dangerous_insecure_extract_jwt_server_token(&token)
406 .context("Could not decode scheduler auth jwt")?;
407 if token_server_id != server_id {
408 bail!(
409 "JWT server id ({:?}) did not match configured server id ({:?})",
410 token_server_id,
411 server_id
412 )
413 }
414 token
415 }
416 };
417
418 let server = Server::new(builder, &cache_dir, toolchain_cache_size)
419 .context("Failed to create sccache server instance")?;
420 let http_server = dist::http::Server::new(
421 public_addr,
422 scheduler_url.to_url(),
423 scheduler_auth,
424 server,
425 )
426 .context("Failed to create sccache HTTP server instance")?;
427 void::unreachable(http_server.start()?)
428 }
429 }
430 }
431
init_logging()432 fn init_logging() {
433 if env::var("RUST_LOG").is_ok() {
434 match env_logger::try_init() {
435 Ok(_) => (),
436 Err(e) => panic!(format!("Failed to initalize logging: {:?}", e)),
437 }
438 }
439 }
440
441 const MAX_PER_CORE_LOAD: f64 = 10f64;
442 const SERVER_REMEMBER_ERROR_TIMEOUT: Duration = Duration::from_secs(300);
443 const UNCLAIMED_PENDING_TIMEOUT: Duration = Duration::from_secs(300);
444 const UNCLAIMED_READY_TIMEOUT: Duration = Duration::from_secs(60);
445
446 #[derive(Copy, Clone)]
447 struct JobDetail {
448 server_id: ServerId,
449 state: JobState,
450 }
451
452 // To avoid deadlicking, make sure to do all locking at once (i.e. no further locking in a downward scope),
453 // in alphabetical order
454 pub struct Scheduler {
455 job_count: AtomicUsize,
456
457 // Currently running jobs, can never be Complete
458 jobs: Mutex<BTreeMap<JobId, JobDetail>>,
459
460 servers: Mutex<HashMap<ServerId, ServerDetails>>,
461 }
462
463 struct ServerDetails {
464 jobs_assigned: HashSet<JobId>,
465 // Jobs assigned that haven't seen a state change. Can only be pending
466 // or ready.
467 jobs_unclaimed: HashMap<JobId, Instant>,
468 last_seen: Instant,
469 last_error: Option<Instant>,
470 num_cpus: usize,
471 server_nonce: ServerNonce,
472 job_authorizer: Box<dyn JobAuthorizer>,
473 }
474
475 impl Scheduler {
new() -> Self476 pub fn new() -> Self {
477 Scheduler {
478 job_count: AtomicUsize::new(0),
479 jobs: Mutex::new(BTreeMap::new()),
480 servers: Mutex::new(HashMap::new()),
481 }
482 }
483
prune_servers( &self, servers: &mut MutexGuard<HashMap<ServerId, ServerDetails>>, jobs: &mut MutexGuard<BTreeMap<JobId, JobDetail>>, )484 fn prune_servers(
485 &self,
486 servers: &mut MutexGuard<HashMap<ServerId, ServerDetails>>,
487 jobs: &mut MutexGuard<BTreeMap<JobId, JobDetail>>,
488 ) {
489 let now = Instant::now();
490
491 let mut dead_servers = Vec::new();
492
493 for (&server_id, details) in servers.iter_mut() {
494 if now.duration_since(details.last_seen) > dist::http::HEARTBEAT_TIMEOUT {
495 dead_servers.push(server_id);
496 }
497 }
498
499 for server_id in dead_servers {
500 warn!(
501 "Server {} appears to be dead, pruning it in the scheduler",
502 server_id.addr()
503 );
504 let server_details = servers
505 .remove(&server_id)
506 .expect("server went missing from map");
507 for job_id in server_details.jobs_assigned {
508 warn!(
509 "Non-terminated job {} was cleaned up in server pruning",
510 job_id
511 );
512 // A job may be missing here if it failed to allocate
513 // initially, so just warn if it's not present.
514 if jobs.remove(&job_id).is_none() {
515 warn!(
516 "Non-terminated job {} assignment originally failed.",
517 job_id
518 );
519 }
520 }
521 }
522 }
523 }
524
525 impl SchedulerIncoming for Scheduler {
handle_alloc_job( &self, requester: &dyn SchedulerOutgoing, tc: Toolchain, ) -> Result<AllocJobResult>526 fn handle_alloc_job(
527 &self,
528 requester: &dyn SchedulerOutgoing,
529 tc: Toolchain,
530 ) -> Result<AllocJobResult> {
531 let (job_id, server_id, auth) = {
532 // LOCKS
533 let mut servers = self.servers.lock().unwrap();
534
535 let res = {
536 let mut best = None;
537 let mut best_err = None;
538 let mut best_load: f64 = MAX_PER_CORE_LOAD;
539 let now = Instant::now();
540 for (&server_id, details) in servers.iter_mut() {
541 let load = details.jobs_assigned.len() as f64 / details.num_cpus as f64;
542
543 if let Some(last_error) = details.last_error {
544 if load < MAX_PER_CORE_LOAD {
545 if now.duration_since(last_error) > SERVER_REMEMBER_ERROR_TIMEOUT {
546 details.last_error = None;
547 }
548 match best_err {
549 Some((
550 _,
551 &mut ServerDetails {
552 last_error: Some(best_last_err),
553 ..
554 },
555 )) => {
556 if last_error < best_last_err {
557 trace!(
558 "Selected {:?}, its most recent error is {:?} ago",
559 server_id,
560 now - last_error
561 );
562 best_err = Some((server_id, details));
563 }
564 }
565 _ => {
566 trace!(
567 "Selected {:?}, its most recent error is {:?} ago",
568 server_id,
569 now - last_error
570 );
571 best_err = Some((server_id, details));
572 }
573 }
574 }
575 } else if load < best_load {
576 best = Some((server_id, details));
577 trace!("Selected {:?} as the server with the best load", server_id);
578 best_load = load;
579 if load == 0f64 {
580 break;
581 }
582 }
583 }
584
585 // Assign the job to our best choice
586 if let Some((server_id, server_details)) = best.or(best_err) {
587 let job_count = self.job_count.fetch_add(1, Ordering::SeqCst) as u64;
588 let job_id = JobId(job_count);
589 assert!(server_details.jobs_assigned.insert(job_id));
590 assert!(server_details
591 .jobs_unclaimed
592 .insert(job_id, Instant::now())
593 .is_none());
594
595 info!(
596 "Job {} created and will be assigned to server {:?}",
597 job_id, server_id
598 );
599 let auth = server_details
600 .job_authorizer
601 .generate_token(job_id)
602 .map_err(Error::from)
603 .context("Could not create an auth token for this job")?;
604 Some((job_id, server_id, auth))
605 } else {
606 None
607 }
608 };
609
610 if let Some(res) = res {
611 res
612 } else {
613 let msg = format!(
614 "Insufficient capacity across {} available servers",
615 servers.len()
616 );
617 return Ok(AllocJobResult::Fail { msg });
618 }
619 };
620 let AssignJobResult {
621 state,
622 need_toolchain,
623 } = requester
624 .do_assign_job(server_id, job_id, tc, auth.clone())
625 .with_context(|| {
626 // LOCKS
627 let mut servers = self.servers.lock().unwrap();
628 if let Some(entry) = servers.get_mut(&server_id) {
629 entry.last_error = Some(Instant::now());
630 entry.jobs_unclaimed.remove(&job_id);
631 if !entry.jobs_assigned.remove(&job_id) {
632 "assign job failed and job not known to the server"
633 } else {
634 "assign job failed, job un-assigned from the server"
635 }
636 } else {
637 "assign job failed and server not known"
638 }
639 })?;
640 {
641 // LOCKS
642 let mut jobs = self.jobs.lock().unwrap();
643
644 info!(
645 "Job {} successfully assigned and saved with state {:?}",
646 job_id, state
647 );
648 assert!(jobs
649 .insert(job_id, JobDetail { server_id, state })
650 .is_none());
651 }
652 let job_alloc = JobAlloc {
653 auth,
654 job_id,
655 server_id,
656 };
657 Ok(AllocJobResult::Success {
658 job_alloc,
659 need_toolchain,
660 })
661 }
662
handle_heartbeat_server( &self, server_id: ServerId, server_nonce: ServerNonce, num_cpus: usize, job_authorizer: Box<dyn JobAuthorizer>, ) -> Result<HeartbeatServerResult>663 fn handle_heartbeat_server(
664 &self,
665 server_id: ServerId,
666 server_nonce: ServerNonce,
667 num_cpus: usize,
668 job_authorizer: Box<dyn JobAuthorizer>,
669 ) -> Result<HeartbeatServerResult> {
670 if num_cpus == 0 {
671 bail!("Invalid number of CPUs (0) specified in heartbeat")
672 }
673
674 // LOCKS
675 let mut jobs = self.jobs.lock().unwrap();
676 let mut servers = self.servers.lock().unwrap();
677
678 self.prune_servers(&mut servers, &mut jobs);
679
680 match servers.get_mut(&server_id) {
681 Some(ref mut details) if details.server_nonce == server_nonce => {
682 let now = Instant::now();
683 details.last_seen = now;
684
685 let mut stale_jobs = Vec::new();
686 for (&job_id, &last_seen) in details.jobs_unclaimed.iter() {
687 if now.duration_since(last_seen) < UNCLAIMED_READY_TIMEOUT {
688 continue;
689 }
690 if let Some(detail) = jobs.get(&job_id) {
691 match detail.state {
692 JobState::Ready => {
693 stale_jobs.push(job_id);
694 }
695 JobState::Pending => {
696 if now.duration_since(last_seen) > UNCLAIMED_PENDING_TIMEOUT {
697 stale_jobs.push(job_id);
698 }
699 }
700 state => {
701 warn!("Invalid unclaimed job state for {}: {}", job_id, state);
702 }
703 }
704 } else {
705 warn!("Unknown stale job {}", job_id);
706 stale_jobs.push(job_id);
707 }
708 }
709
710 if !stale_jobs.is_empty() {
711 warn!(
712 "The following stale jobs will be de-allocated: {:?}",
713 stale_jobs
714 );
715
716 for job_id in stale_jobs {
717 if !details.jobs_assigned.remove(&job_id) {
718 warn!(
719 "Stale job for server {} not assigned: {}",
720 server_id.addr(),
721 job_id
722 );
723 }
724 if details.jobs_unclaimed.remove(&job_id).is_none() {
725 warn!(
726 "Unknown stale job for server {}: {}",
727 server_id.addr(),
728 job_id
729 );
730 }
731 if jobs.remove(&job_id).is_none() {
732 warn!(
733 "Unknown stale job for server {}: {}",
734 server_id.addr(),
735 job_id
736 );
737 }
738 }
739 }
740
741 return Ok(HeartbeatServerResult { is_new: false });
742 }
743 Some(ref mut details) if details.server_nonce != server_nonce => {
744 for job_id in details.jobs_assigned.iter() {
745 if jobs.remove(&job_id).is_none() {
746 warn!(
747 "Unknown job found when replacing server {}: {}",
748 server_id.addr(),
749 job_id
750 );
751 }
752 }
753 }
754 _ => (),
755 }
756 info!("Registered new server {:?}", server_id);
757 servers.insert(
758 server_id,
759 ServerDetails {
760 last_seen: Instant::now(),
761 last_error: None,
762 jobs_assigned: HashSet::new(),
763 jobs_unclaimed: HashMap::new(),
764 num_cpus,
765 server_nonce,
766 job_authorizer,
767 },
768 );
769 Ok(HeartbeatServerResult { is_new: true })
770 }
771
handle_update_job_state( &self, job_id: JobId, server_id: ServerId, job_state: JobState, ) -> Result<UpdateJobStateResult>772 fn handle_update_job_state(
773 &self,
774 job_id: JobId,
775 server_id: ServerId,
776 job_state: JobState,
777 ) -> Result<UpdateJobStateResult> {
778 // LOCKS
779 let mut jobs = self.jobs.lock().unwrap();
780 let mut servers = self.servers.lock().unwrap();
781
782 if let btree_map::Entry::Occupied(mut entry) = jobs.entry(job_id) {
783 let job_detail = entry.get();
784 if job_detail.server_id != server_id {
785 bail!(
786 "Job id {} is not registed on server {:?}",
787 job_id,
788 server_id
789 )
790 }
791
792 let now = Instant::now();
793 let mut server_details = servers.get_mut(&server_id);
794 if let Some(ref mut details) = server_details {
795 details.last_seen = now;
796 };
797
798 match (job_detail.state, job_state) {
799 (JobState::Pending, JobState::Ready) => entry.get_mut().state = job_state,
800 (JobState::Ready, JobState::Started) => {
801 if let Some(details) = server_details {
802 details.jobs_unclaimed.remove(&job_id);
803 } else {
804 warn!("Job state updated, but server is not known to scheduler")
805 }
806 entry.get_mut().state = job_state
807 }
808 (JobState::Started, JobState::Complete) => {
809 let (job_id, _) = entry.remove_entry();
810 if let Some(entry) = server_details {
811 assert!(entry.jobs_assigned.remove(&job_id))
812 } else {
813 bail!("Job was marked as finished, but server is not known to scheduler")
814 }
815 }
816 (from, to) => bail!("Invalid job state transition from {} to {}", from, to),
817 }
818 info!("Job {} updated state to {:?}", job_id, job_state);
819 } else {
820 bail!("Unknown job")
821 }
822 Ok(UpdateJobStateResult::Success)
823 }
824
handle_status(&self) -> Result<SchedulerStatusResult>825 fn handle_status(&self) -> Result<SchedulerStatusResult> {
826 // LOCKS
827 let mut jobs = self.jobs.lock().unwrap();
828 let mut servers = self.servers.lock().unwrap();
829
830 self.prune_servers(&mut servers, &mut jobs);
831
832 Ok(SchedulerStatusResult {
833 num_servers: servers.len(),
834 num_cpus: servers.values().map(|v| v.num_cpus).sum(),
835 in_progress: jobs.len(),
836 })
837 }
838 }
839
840 pub struct Server {
841 builder: Box<dyn BuilderIncoming>,
842 cache: Mutex<TcCache>,
843 job_toolchains: Mutex<HashMap<JobId, Toolchain>>,
844 }
845
846 impl Server {
new( builder: Box<dyn BuilderIncoming>, cache_dir: &Path, toolchain_cache_size: u64, ) -> Result<Server>847 pub fn new(
848 builder: Box<dyn BuilderIncoming>,
849 cache_dir: &Path,
850 toolchain_cache_size: u64,
851 ) -> Result<Server> {
852 let cache = TcCache::new(&cache_dir.join("tc"), toolchain_cache_size)
853 .context("Failed to create toolchain cache")?;
854 Ok(Server {
855 builder,
856 cache: Mutex::new(cache),
857 job_toolchains: Mutex::new(HashMap::new()),
858 })
859 }
860 }
861
862 impl ServerIncoming for Server {
handle_assign_job(&self, job_id: JobId, tc: Toolchain) -> Result<AssignJobResult>863 fn handle_assign_job(&self, job_id: JobId, tc: Toolchain) -> Result<AssignJobResult> {
864 let need_toolchain = !self.cache.lock().unwrap().contains_toolchain(&tc);
865 assert!(self
866 .job_toolchains
867 .lock()
868 .unwrap()
869 .insert(job_id, tc)
870 .is_none());
871 let state = if need_toolchain {
872 JobState::Pending
873 } else {
874 // TODO: can start prepping the build environment now
875 JobState::Ready
876 };
877 Ok(AssignJobResult {
878 state,
879 need_toolchain,
880 })
881 }
handle_submit_toolchain( &self, requester: &dyn ServerOutgoing, job_id: JobId, tc_rdr: ToolchainReader, ) -> Result<SubmitToolchainResult>882 fn handle_submit_toolchain(
883 &self,
884 requester: &dyn ServerOutgoing,
885 job_id: JobId,
886 tc_rdr: ToolchainReader,
887 ) -> Result<SubmitToolchainResult> {
888 requester
889 .do_update_job_state(job_id, JobState::Ready)
890 .context("Updating job state failed")?;
891 // TODO: need to lock the toolchain until the container has started
892 // TODO: can start prepping container
893 let tc = match self.job_toolchains.lock().unwrap().get(&job_id).cloned() {
894 Some(tc) => tc,
895 None => return Ok(SubmitToolchainResult::JobNotFound),
896 };
897 let mut cache = self.cache.lock().unwrap();
898 // TODO: this returns before reading all the data, is that valid?
899 if cache.contains_toolchain(&tc) {
900 return Ok(SubmitToolchainResult::Success);
901 }
902 Ok(cache
903 .insert_with(&tc, |mut file| {
904 io::copy(&mut { tc_rdr }, &mut file).map(|_| ())
905 })
906 .map(|_| SubmitToolchainResult::Success)
907 .unwrap_or(SubmitToolchainResult::CannotCache))
908 }
handle_run_job( &self, requester: &dyn ServerOutgoing, job_id: JobId, command: CompileCommand, outputs: Vec<String>, inputs_rdr: InputsReader, ) -> Result<RunJobResult>909 fn handle_run_job(
910 &self,
911 requester: &dyn ServerOutgoing,
912 job_id: JobId,
913 command: CompileCommand,
914 outputs: Vec<String>,
915 inputs_rdr: InputsReader,
916 ) -> Result<RunJobResult> {
917 requester
918 .do_update_job_state(job_id, JobState::Started)
919 .context("Updating job state failed")?;
920 let tc = self.job_toolchains.lock().unwrap().remove(&job_id);
921 let res = match tc {
922 None => Ok(RunJobResult::JobNotFound),
923 Some(tc) => {
924 match self
925 .builder
926 .run_build(tc, command, outputs, inputs_rdr, &self.cache)
927 {
928 Err(e) => Err(e.context("run build failed")),
929 Ok(res) => Ok(RunJobResult::Complete(JobComplete {
930 output: res.output,
931 outputs: res.outputs,
932 })),
933 }
934 }
935 };
936 requester
937 .do_update_job_state(job_id, JobState::Complete)
938 .context("Updating job state failed")?;
939 res
940 }
941 }
942