1package nomad 2 3import ( 4 "errors" 5 "fmt" 6 "io" 7 "net" 8 "time" 9 10 log "github.com/hashicorp/go-hclog" 11 "github.com/hashicorp/go-msgpack/codec" 12 13 "github.com/hashicorp/consul/agent/consul/autopilot" 14 cstructs "github.com/hashicorp/nomad/client/structs" 15 "github.com/hashicorp/nomad/helper/snapshot" 16 "github.com/hashicorp/nomad/nomad/structs" 17 "github.com/hashicorp/raft" 18 "github.com/hashicorp/serf/serf" 19) 20 21// Operator endpoint is used to perform low-level operator tasks for Nomad. 22type Operator struct { 23 srv *Server 24 logger log.Logger 25} 26 27func (op *Operator) register() { 28 op.srv.streamingRpcs.Register("Operator.SnapshotSave", op.snapshotSave) 29 op.srv.streamingRpcs.Register("Operator.SnapshotRestore", op.snapshotRestore) 30} 31 32// RaftGetConfiguration is used to retrieve the current Raft configuration. 33func (op *Operator) RaftGetConfiguration(args *structs.GenericRequest, reply *structs.RaftConfigurationResponse) error { 34 if done, err := op.srv.forward("Operator.RaftGetConfiguration", args, args, reply); done { 35 return err 36 } 37 38 // Check management permissions 39 if aclObj, err := op.srv.ResolveToken(args.AuthToken); err != nil { 40 return err 41 } else if aclObj != nil && !aclObj.IsManagement() { 42 return structs.ErrPermissionDenied 43 } 44 45 // We can't fetch the leader and the configuration atomically with 46 // the current Raft API. 47 future := op.srv.raft.GetConfiguration() 48 if err := future.Error(); err != nil { 49 return err 50 } 51 52 // Index the Nomad information about the servers. 53 serverMap := make(map[raft.ServerAddress]serf.Member) 54 for _, member := range op.srv.serf.Members() { 55 valid, parts := isNomadServer(member) 56 if !valid { 57 continue 58 } 59 60 addr := (&net.TCPAddr{IP: member.Addr, Port: parts.Port}).String() 61 serverMap[raft.ServerAddress(addr)] = member 62 } 63 64 // Fill out the reply. 65 leader := op.srv.raft.Leader() 66 reply.Index = future.Index() 67 for _, server := range future.Configuration().Servers { 68 node := "(unknown)" 69 raftProtocolVersion := "unknown" 70 if member, ok := serverMap[server.Address]; ok { 71 node = member.Name 72 if raftVsn, ok := member.Tags["raft_vsn"]; ok { 73 raftProtocolVersion = raftVsn 74 } 75 } 76 77 entry := &structs.RaftServer{ 78 ID: server.ID, 79 Node: node, 80 Address: server.Address, 81 Leader: server.Address == leader, 82 Voter: server.Suffrage == raft.Voter, 83 RaftProtocol: raftProtocolVersion, 84 } 85 reply.Servers = append(reply.Servers, entry) 86 } 87 return nil 88} 89 90// RaftRemovePeerByAddress is used to kick a stale peer (one that it in the Raft 91// quorum but no longer known to Serf or the catalog) by address in the form of 92// "IP:port". The reply argument is not used, but it required to fulfill the RPC 93// interface. 94func (op *Operator) RaftRemovePeerByAddress(args *structs.RaftPeerByAddressRequest, reply *struct{}) error { 95 if done, err := op.srv.forward("Operator.RaftRemovePeerByAddress", args, args, reply); done { 96 return err 97 } 98 99 // Check management permissions 100 if aclObj, err := op.srv.ResolveToken(args.AuthToken); err != nil { 101 return err 102 } else if aclObj != nil && !aclObj.IsManagement() { 103 return structs.ErrPermissionDenied 104 } 105 106 // Since this is an operation designed for humans to use, we will return 107 // an error if the supplied address isn't among the peers since it's 108 // likely they screwed up. 109 { 110 future := op.srv.raft.GetConfiguration() 111 if err := future.Error(); err != nil { 112 return err 113 } 114 for _, s := range future.Configuration().Servers { 115 if s.Address == args.Address { 116 goto REMOVE 117 } 118 } 119 return fmt.Errorf("address %q was not found in the Raft configuration", 120 args.Address) 121 } 122 123REMOVE: 124 // The Raft library itself will prevent various forms of foot-shooting, 125 // like making a configuration with no voters. Some consideration was 126 // given here to adding more checks, but it was decided to make this as 127 // low-level and direct as possible. We've got ACL coverage to lock this 128 // down, and if you are an operator, it's assumed you know what you are 129 // doing if you are calling this. If you remove a peer that's known to 130 // Serf, for example, it will come back when the leader does a reconcile 131 // pass. 132 future := op.srv.raft.RemovePeer(args.Address) 133 if err := future.Error(); err != nil { 134 op.logger.Warn("failed to remove Raft peer", "peer", args.Address, "error", err) 135 return err 136 } 137 138 op.logger.Warn("removed Raft peer", "peer", args.Address) 139 return nil 140} 141 142// RaftRemovePeerByID is used to kick a stale peer (one that is in the Raft 143// quorum but no longer known to Serf or the catalog) by address in the form of 144// "IP:port". The reply argument is not used, but is required to fulfill the RPC 145// interface. 146func (op *Operator) RaftRemovePeerByID(args *structs.RaftPeerByIDRequest, reply *struct{}) error { 147 if done, err := op.srv.forward("Operator.RaftRemovePeerByID", args, args, reply); done { 148 return err 149 } 150 151 // Check management permissions 152 if aclObj, err := op.srv.ResolveToken(args.AuthToken); err != nil { 153 return err 154 } else if aclObj != nil && !aclObj.IsManagement() { 155 return structs.ErrPermissionDenied 156 } 157 158 // Since this is an operation designed for humans to use, we will return 159 // an error if the supplied id isn't among the peers since it's 160 // likely they screwed up. 161 var address raft.ServerAddress 162 { 163 future := op.srv.raft.GetConfiguration() 164 if err := future.Error(); err != nil { 165 return err 166 } 167 for _, s := range future.Configuration().Servers { 168 if s.ID == args.ID { 169 address = s.Address 170 goto REMOVE 171 } 172 } 173 return fmt.Errorf("id %q was not found in the Raft configuration", 174 args.ID) 175 } 176 177REMOVE: 178 // The Raft library itself will prevent various forms of foot-shooting, 179 // like making a configuration with no voters. Some consideration was 180 // given here to adding more checks, but it was decided to make this as 181 // low-level and direct as possible. We've got ACL coverage to lock this 182 // down, and if you are an operator, it's assumed you know what you are 183 // doing if you are calling this. If you remove a peer that's known to 184 // Serf, for example, it will come back when the leader does a reconcile 185 // pass. 186 minRaftProtocol, err := op.srv.autopilot.MinRaftProtocol() 187 if err != nil { 188 return err 189 } 190 191 var future raft.Future 192 if minRaftProtocol >= 2 { 193 future = op.srv.raft.RemoveServer(args.ID, 0, 0) 194 } else { 195 future = op.srv.raft.RemovePeer(address) 196 } 197 if err := future.Error(); err != nil { 198 op.logger.Warn("failed to remove Raft peer", "peer_id", args.ID, "error", err) 199 return err 200 } 201 202 op.logger.Warn("removed Raft peer", "peer_id", args.ID) 203 return nil 204} 205 206// AutopilotGetConfiguration is used to retrieve the current Autopilot configuration. 207func (op *Operator) AutopilotGetConfiguration(args *structs.GenericRequest, reply *structs.AutopilotConfig) error { 208 if done, err := op.srv.forward("Operator.AutopilotGetConfiguration", args, args, reply); done { 209 return err 210 } 211 212 // This action requires operator read access. 213 rule, err := op.srv.ResolveToken(args.AuthToken) 214 if err != nil { 215 return err 216 } 217 if rule != nil && !rule.AllowOperatorRead() { 218 return structs.ErrPermissionDenied 219 } 220 221 state := op.srv.fsm.State() 222 _, config, err := state.AutopilotConfig() 223 if err != nil { 224 return err 225 } 226 if config == nil { 227 return fmt.Errorf("autopilot config not initialized yet") 228 } 229 230 *reply = *config 231 232 return nil 233} 234 235// AutopilotSetConfiguration is used to set the current Autopilot configuration. 236func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRequest, reply *bool) error { 237 if done, err := op.srv.forward("Operator.AutopilotSetConfiguration", args, args, reply); done { 238 return err 239 } 240 241 // This action requires operator write access. 242 rule, err := op.srv.ResolveToken(args.AuthToken) 243 if err != nil { 244 return err 245 } 246 if rule != nil && !rule.AllowOperatorWrite() { 247 return structs.ErrPermissionDenied 248 } 249 250 // All servers should be at or above 0.8.0 to apply this operatation 251 if !ServersMeetMinimumVersion(op.srv.Members(), minAutopilotVersion, false) { 252 return fmt.Errorf("All servers should be running version %v to update autopilot config", minAutopilotVersion) 253 } 254 255 // Apply the update 256 resp, _, err := op.srv.raftApply(structs.AutopilotRequestType, args) 257 if err != nil { 258 op.logger.Error("failed applying AutoPilot configuration", "error", err) 259 return err 260 } 261 if respErr, ok := resp.(error); ok { 262 return respErr 263 } 264 265 // Check if the return type is a bool. 266 if respBool, ok := resp.(bool); ok { 267 *reply = respBool 268 } 269 return nil 270} 271 272// ServerHealth is used to get the current health of the servers. 273func (op *Operator) ServerHealth(args *structs.GenericRequest, reply *autopilot.OperatorHealthReply) error { 274 // This must be sent to the leader, so we fix the args since we are 275 // re-using a structure where we don't support all the options. 276 args.AllowStale = false 277 if done, err := op.srv.forward("Operator.ServerHealth", args, args, reply); done { 278 return err 279 } 280 281 // This action requires operator read access. 282 rule, err := op.srv.ResolveToken(args.AuthToken) 283 if err != nil { 284 return err 285 } 286 if rule != nil && !rule.AllowOperatorRead() { 287 return structs.ErrPermissionDenied 288 } 289 290 // Exit early if the min Raft version is too low 291 minRaftProtocol, err := op.srv.autopilot.MinRaftProtocol() 292 if err != nil { 293 return fmt.Errorf("error getting server raft protocol versions: %s", err) 294 } 295 if minRaftProtocol < 3 { 296 return fmt.Errorf("all servers must have raft_protocol set to 3 or higher to use this endpoint") 297 } 298 299 *reply = op.srv.autopilot.GetClusterHealth() 300 301 return nil 302} 303 304// SchedulerSetConfiguration is used to set the current Scheduler configuration. 305func (op *Operator) SchedulerSetConfiguration(args *structs.SchedulerSetConfigRequest, reply *structs.SchedulerSetConfigurationResponse) error { 306 if done, err := op.srv.forward("Operator.SchedulerSetConfiguration", args, args, reply); done { 307 return err 308 } 309 310 // This action requires operator write access. 311 rule, err := op.srv.ResolveToken(args.AuthToken) 312 if err != nil { 313 return err 314 } else if rule != nil && !rule.AllowOperatorWrite() { 315 return structs.ErrPermissionDenied 316 } 317 318 // All servers should be at or above 0.9.0 to apply this operation 319 if !ServersMeetMinimumVersion(op.srv.Members(), minSchedulerConfigVersion, false) { 320 return fmt.Errorf("All servers should be running version %v to update scheduler config", minSchedulerConfigVersion) 321 } 322 323 // Apply the update 324 resp, index, err := op.srv.raftApply(structs.SchedulerConfigRequestType, args) 325 if err != nil { 326 op.logger.Error("failed applying Scheduler configuration", "error", err) 327 return err 328 } else if respErr, ok := resp.(error); ok { 329 return respErr 330 } 331 // If CAS request, raft returns a boolean indicating if the update was applied. 332 // Otherwise, assume success 333 reply.Updated = true 334 if respBool, ok := resp.(bool); ok { 335 reply.Updated = respBool 336 } 337 reply.Index = index 338 return nil 339} 340 341// SchedulerGetConfiguration is used to retrieve the current Scheduler configuration. 342func (op *Operator) SchedulerGetConfiguration(args *structs.GenericRequest, reply *structs.SchedulerConfigurationResponse) error { 343 if done, err := op.srv.forward("Operator.SchedulerGetConfiguration", args, args, reply); done { 344 return err 345 } 346 347 // This action requires operator read access. 348 rule, err := op.srv.ResolveToken(args.AuthToken) 349 if err != nil { 350 return err 351 } else if rule != nil && !rule.AllowOperatorRead() { 352 return structs.ErrPermissionDenied 353 } 354 355 state := op.srv.fsm.State() 356 index, config, err := state.SchedulerConfig() 357 358 if err != nil { 359 return err 360 } else if config == nil { 361 return fmt.Errorf("scheduler config not initialized yet") 362 } 363 364 reply.SchedulerConfig = config 365 reply.QueryMeta.Index = index 366 op.srv.setQueryMeta(&reply.QueryMeta) 367 368 return nil 369} 370 371func (op *Operator) forwardStreamingRPC(region string, method string, args interface{}, in io.ReadWriteCloser) error { 372 server, err := op.srv.findRegionServer(region) 373 if err != nil { 374 return err 375 } 376 377 return op.forwardStreamingRPCToServer(server, method, args, in) 378} 379 380func (op *Operator) forwardStreamingRPCToServer(server *serverParts, method string, args interface{}, in io.ReadWriteCloser) error { 381 srvConn, err := op.srv.streamingRpc(server, method) 382 if err != nil { 383 return err 384 } 385 defer srvConn.Close() 386 387 outEncoder := codec.NewEncoder(srvConn, structs.MsgpackHandle) 388 if err := outEncoder.Encode(args); err != nil { 389 return err 390 } 391 392 structs.Bridge(in, srvConn) 393 return nil 394} 395 396func (op *Operator) snapshotSave(conn io.ReadWriteCloser) { 397 defer conn.Close() 398 399 var args structs.SnapshotSaveRequest 400 var reply structs.SnapshotSaveResponse 401 decoder := codec.NewDecoder(conn, structs.MsgpackHandle) 402 encoder := codec.NewEncoder(conn, structs.MsgpackHandle) 403 404 handleFailure := func(code int, err error) { 405 encoder.Encode(&structs.SnapshotSaveResponse{ 406 ErrorCode: code, 407 ErrorMsg: err.Error(), 408 }) 409 } 410 411 if err := decoder.Decode(&args); err != nil { 412 handleFailure(500, err) 413 return 414 } 415 416 // Forward to appropriate region 417 if args.Region != op.srv.Region() { 418 err := op.forwardStreamingRPC(args.Region, "Operator.SnapshotSave", args, conn) 419 if err != nil { 420 handleFailure(500, err) 421 } 422 return 423 } 424 425 // forward to leader 426 if !args.AllowStale { 427 remoteServer, err := op.srv.getLeaderForRPC() 428 if err != nil { 429 handleFailure(500, err) 430 return 431 } 432 if remoteServer != nil { 433 err := op.forwardStreamingRPCToServer(remoteServer, "Operator.SnapshotSave", args, conn) 434 if err != nil { 435 handleFailure(500, err) 436 } 437 return 438 439 } 440 } 441 442 // Check agent permissions 443 if aclObj, err := op.srv.ResolveToken(args.AuthToken); err != nil { 444 code := 500 445 if err == structs.ErrTokenNotFound { 446 code = 400 447 } 448 handleFailure(code, err) 449 return 450 } else if aclObj != nil && !aclObj.IsManagement() { 451 handleFailure(403, structs.ErrPermissionDenied) 452 return 453 } 454 455 op.srv.setQueryMeta(&reply.QueryMeta) 456 457 // Take the snapshot and capture the index. 458 snap, err := snapshot.New(op.logger.Named("snapshot"), op.srv.raft) 459 reply.SnapshotChecksum = snap.Checksum() 460 reply.Index = snap.Index() 461 if err != nil { 462 handleFailure(500, err) 463 return 464 } 465 defer snap.Close() 466 467 if err := encoder.Encode(&reply); err != nil { 468 handleFailure(500, fmt.Errorf("failed to encode response: %v", err)) 469 return 470 } 471 if snap != nil { 472 if _, err := io.Copy(conn, snap); err != nil { 473 handleFailure(500, fmt.Errorf("failed to stream snapshot: %v", err)) 474 } 475 } 476} 477 478func (op *Operator) snapshotRestore(conn io.ReadWriteCloser) { 479 defer conn.Close() 480 481 var args structs.SnapshotRestoreRequest 482 var reply structs.SnapshotRestoreResponse 483 decoder := codec.NewDecoder(conn, structs.MsgpackHandle) 484 encoder := codec.NewEncoder(conn, structs.MsgpackHandle) 485 486 handleFailure := func(code int, err error) { 487 encoder.Encode(&structs.SnapshotRestoreResponse{ 488 ErrorCode: code, 489 ErrorMsg: err.Error(), 490 }) 491 } 492 493 if err := decoder.Decode(&args); err != nil { 494 handleFailure(500, err) 495 return 496 } 497 498 // Forward to appropriate region 499 if args.Region != op.srv.Region() { 500 err := op.forwardStreamingRPC(args.Region, "Operator.SnapshotRestore", args, conn) 501 if err != nil { 502 handleFailure(500, err) 503 } 504 return 505 } 506 507 // forward to leader 508 remoteServer, err := op.srv.getLeaderForRPC() 509 if err != nil { 510 handleFailure(500, err) 511 return 512 } 513 if remoteServer != nil { 514 err := op.forwardStreamingRPCToServer(remoteServer, "Operator.SnapshotRestore", args, conn) 515 if err != nil { 516 handleFailure(500, err) 517 } 518 return 519 520 } 521 522 // Check agent permissions 523 if aclObj, err := op.srv.ResolveToken(args.AuthToken); err != nil { 524 code := 500 525 if err == structs.ErrTokenNotFound { 526 code = 400 527 } 528 handleFailure(code, err) 529 return 530 } else if aclObj != nil && !aclObj.IsManagement() { 531 handleFailure(403, structs.ErrPermissionDenied) 532 return 533 } 534 535 op.srv.setQueryMeta(&reply.QueryMeta) 536 537 reader, errCh := decodeStreamOutput(decoder) 538 539 err = snapshot.Restore(op.logger.Named("snapshot"), reader, op.srv.raft) 540 if err != nil { 541 handleFailure(500, fmt.Errorf("failed to restore from snapshot: %v", err)) 542 return 543 } 544 545 err = <-errCh 546 if err != nil { 547 handleFailure(400, fmt.Errorf("failed to read stream: %v", err)) 548 return 549 } 550 551 // This'll be used for feedback from the leader loop. 552 timeoutCh := time.After(time.Minute) 553 554 lerrCh := make(chan error, 1) 555 556 select { 557 // Reassert leader actions and update all leader related state 558 // with new state store content. 559 case op.srv.reassertLeaderCh <- lerrCh: 560 561 // We might have lost leadership while waiting to kick the loop. 562 case <-timeoutCh: 563 handleFailure(500, fmt.Errorf("timed out waiting to re-run leader actions")) 564 565 // Make sure we don't get stuck during shutdown 566 case <-op.srv.shutdownCh: 567 } 568 569 select { 570 // Wait for the leader loop to finish up. 571 case err := <-lerrCh: 572 if err != nil { 573 handleFailure(500, err) 574 return 575 } 576 577 // We might have lost leadership while the loop was doing its 578 // thing. 579 case <-timeoutCh: 580 handleFailure(500, fmt.Errorf("timed out waiting for re-run of leader actions")) 581 582 // Make sure we don't get stuck during shutdown 583 case <-op.srv.shutdownCh: 584 } 585 586 reply.Index, _ = op.srv.State().LatestIndex() 587 op.srv.setQueryMeta(&reply.QueryMeta) 588 encoder.Encode(reply) 589} 590 591func decodeStreamOutput(decoder *codec.Decoder) (io.Reader, <-chan error) { 592 pr, pw := io.Pipe() 593 errCh := make(chan error, 1) 594 595 go func() { 596 defer close(errCh) 597 598 for { 599 var wrapper cstructs.StreamErrWrapper 600 601 err := decoder.Decode(&wrapper) 602 if err != nil { 603 pw.CloseWithError(fmt.Errorf("failed to decode input: %v", err)) 604 errCh <- err 605 return 606 } 607 608 if len(wrapper.Payload) != 0 { 609 _, err = pw.Write(wrapper.Payload) 610 if err != nil { 611 pw.CloseWithError(err) 612 errCh <- err 613 return 614 } 615 } 616 617 if errW := wrapper.Error; errW != nil { 618 if errW.Message == io.EOF.Error() { 619 pw.CloseWithError(io.EOF) 620 } else { 621 pw.CloseWithError(errors.New(errW.Message)) 622 } 623 return 624 } 625 } 626 }() 627 628 return pr, errCh 629} 630