1package nomad 2 3import ( 4 "context" 5 "fmt" 6 "strings" 7 "sync" 8 "time" 9 10 "golang.org/x/sync/errgroup" 11 12 metrics "github.com/armon/go-metrics" 13 log "github.com/hashicorp/go-hclog" 14 memdb "github.com/hashicorp/go-memdb" 15 multierror "github.com/hashicorp/go-multierror" 16 vapi "github.com/hashicorp/vault/api" 17 18 "github.com/hashicorp/nomad/acl" 19 "github.com/hashicorp/nomad/helper/uuid" 20 "github.com/hashicorp/nomad/nomad/state" 21 "github.com/hashicorp/nomad/nomad/structs" 22 "github.com/hashicorp/raft" 23 "github.com/pkg/errors" 24) 25 26const ( 27 // batchUpdateInterval is how long we wait to batch updates 28 batchUpdateInterval = 50 * time.Millisecond 29 30 // maxParallelRequestsPerDerive is the maximum number of parallel Vault 31 // create token requests that may be outstanding per derive request 32 maxParallelRequestsPerDerive = 16 33 34 // NodeDrainEvents are the various drain messages 35 NodeDrainEventDrainSet = "Node drain strategy set" 36 NodeDrainEventDrainDisabled = "Node drain disabled" 37 NodeDrainEventDrainUpdated = "Node drain stategy updated" 38 39 // NodeEligibilityEventEligible is used when the nodes eligiblity is marked 40 // eligible 41 NodeEligibilityEventEligible = "Node marked as eligible for scheduling" 42 43 // NodeEligibilityEventIneligible is used when the nodes eligiblity is marked 44 // ineligible 45 NodeEligibilityEventIneligible = "Node marked as ineligible for scheduling" 46 47 // NodeHeartbeatEventReregistered is the message used when the node becomes 48 // reregistered by the heartbeat. 49 NodeHeartbeatEventReregistered = "Node reregistered by heartbeat" 50) 51 52// Node endpoint is used for client interactions 53type Node struct { 54 srv *Server 55 logger log.Logger 56 57 // ctx provides context regarding the underlying connection 58 ctx *RPCContext 59 60 // updates holds pending client status updates for allocations 61 updates []*structs.Allocation 62 63 // evals holds pending rescheduling eval updates triggered by failed allocations 64 evals []*structs.Evaluation 65 66 // updateFuture is used to wait for the pending batch update 67 // to complete. This may be nil if no batch is pending. 68 updateFuture *structs.BatchFuture 69 70 // updateTimer is the timer that will trigger the next batch 71 // update, and may be nil if there is no batch pending. 72 updateTimer *time.Timer 73 74 // updatesLock synchronizes access to the updates list, 75 // the future and the timer. 76 updatesLock sync.Mutex 77} 78 79// Register is used to upsert a client that is available for scheduling 80func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUpdateResponse) error { 81 isForwarded := args.IsForwarded() 82 if done, err := n.srv.forward("Node.Register", args, args, reply); done { 83 // We have a valid node connection since there is no error from the 84 // forwarded server, so add the mapping to cache the 85 // connection and allow the server to send RPCs to the client. 86 if err == nil && n.ctx != nil && n.ctx.NodeID == "" && !isForwarded { 87 n.ctx.NodeID = args.Node.ID 88 n.srv.addNodeConn(n.ctx) 89 } 90 91 return err 92 } 93 defer metrics.MeasureSince([]string{"nomad", "client", "register"}, time.Now()) 94 95 // Validate the arguments 96 if args.Node == nil { 97 return fmt.Errorf("missing node for client registration") 98 } 99 if args.Node.ID == "" { 100 return fmt.Errorf("missing node ID for client registration") 101 } 102 if args.Node.Datacenter == "" { 103 return fmt.Errorf("missing datacenter for client registration") 104 } 105 if args.Node.Name == "" { 106 return fmt.Errorf("missing node name for client registration") 107 } 108 if len(args.Node.Attributes) == 0 { 109 return fmt.Errorf("missing attributes for client registration") 110 } 111 if args.Node.SecretID == "" { 112 return fmt.Errorf("missing node secret ID for client registration") 113 } 114 115 // Default the status if none is given 116 if args.Node.Status == "" { 117 args.Node.Status = structs.NodeStatusInit 118 } 119 if !structs.ValidNodeStatus(args.Node.Status) { 120 return fmt.Errorf("invalid status for node") 121 } 122 123 // Default to eligible for scheduling if unset 124 if args.Node.SchedulingEligibility == "" { 125 args.Node.SchedulingEligibility = structs.NodeSchedulingEligible 126 } 127 128 // Set the timestamp when the node is registered 129 args.Node.StatusUpdatedAt = time.Now().Unix() 130 131 // Compute the node class 132 if err := args.Node.ComputeClass(); err != nil { 133 return fmt.Errorf("failed to computed node class: %v", err) 134 } 135 136 // Look for the node so we can detect a state transition 137 snap, err := n.srv.fsm.State().Snapshot() 138 if err != nil { 139 return err 140 } 141 142 ws := memdb.NewWatchSet() 143 originalNode, err := snap.NodeByID(ws, args.Node.ID) 144 if err != nil { 145 return err 146 } 147 148 // Check if the SecretID has been tampered with 149 if originalNode != nil { 150 if args.Node.SecretID != originalNode.SecretID && originalNode.SecretID != "" { 151 return fmt.Errorf("node secret ID does not match. Not registering node.") 152 } 153 } 154 155 // We have a valid node connection, so add the mapping to cache the 156 // connection and allow the server to send RPCs to the client. We only cache 157 // the connection if it is not being forwarded from another server. 158 if n.ctx != nil && n.ctx.NodeID == "" && !args.IsForwarded() { 159 n.ctx.NodeID = args.Node.ID 160 n.srv.addNodeConn(n.ctx) 161 } 162 163 // Commit this update via Raft 164 _, index, err := n.srv.raftApply(structs.NodeRegisterRequestType, args) 165 if err != nil { 166 n.logger.Error("register failed", "error", err) 167 return err 168 } 169 reply.NodeModifyIndex = index 170 171 // Check if we should trigger evaluations 172 originalStatus := structs.NodeStatusInit 173 if originalNode != nil { 174 originalStatus = originalNode.Status 175 } 176 transitionToReady := transitionedToReady(args.Node.Status, originalStatus) 177 if structs.ShouldDrainNode(args.Node.Status) || transitionToReady { 178 evalIDs, evalIndex, err := n.createNodeEvals(args.Node.ID, index) 179 if err != nil { 180 n.logger.Error("eval creation failed", "error", err) 181 return err 182 } 183 reply.EvalIDs = evalIDs 184 reply.EvalCreateIndex = evalIndex 185 } 186 187 // Check if we need to setup a heartbeat 188 if !args.Node.TerminalStatus() { 189 ttl, err := n.srv.resetHeartbeatTimer(args.Node.ID) 190 if err != nil { 191 n.logger.Error("heartbeat reset failed", "error", err) 192 return err 193 } 194 reply.HeartbeatTTL = ttl 195 } 196 197 // Set the reply index 198 reply.Index = index 199 snap, err = n.srv.fsm.State().Snapshot() 200 if err != nil { 201 return err 202 } 203 204 n.srv.peerLock.RLock() 205 defer n.srv.peerLock.RUnlock() 206 if err := n.constructNodeServerInfoResponse(snap, reply); err != nil { 207 n.logger.Error("failed to populate NodeUpdateResponse", "error", err) 208 return err 209 } 210 211 return nil 212} 213 214// updateNodeUpdateResponse assumes the n.srv.peerLock is held for reading. 215func (n *Node) constructNodeServerInfoResponse(snap *state.StateSnapshot, reply *structs.NodeUpdateResponse) error { 216 reply.LeaderRPCAddr = string(n.srv.raft.Leader()) 217 218 // Reply with config information required for future RPC requests 219 reply.Servers = make([]*structs.NodeServerInfo, 0, len(n.srv.localPeers)) 220 for _, v := range n.srv.localPeers { 221 reply.Servers = append(reply.Servers, 222 &structs.NodeServerInfo{ 223 RPCAdvertiseAddr: v.RPCAddr.String(), 224 RPCMajorVersion: int32(v.MajorVersion), 225 RPCMinorVersion: int32(v.MinorVersion), 226 Datacenter: v.Datacenter, 227 }) 228 } 229 230 // TODO(sean@): Use an indexed node count instead 231 // 232 // Snapshot is used only to iterate over all nodes to create a node 233 // count to send back to Nomad Clients in their heartbeat so Clients 234 // can estimate the size of the cluster. 235 ws := memdb.NewWatchSet() 236 iter, err := snap.Nodes(ws) 237 if err == nil { 238 for { 239 raw := iter.Next() 240 if raw == nil { 241 break 242 } 243 reply.NumNodes++ 244 } 245 } 246 247 reply.Features = n.srv.EnterpriseState.Features() 248 249 return nil 250} 251 252// Deregister is used to remove a client from the cluster. If a client should 253// just be made unavailable for scheduling, a status update is preferred. 254func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.NodeUpdateResponse) error { 255 if done, err := n.srv.forward("Node.Deregister", args, args, reply); done { 256 return err 257 } 258 defer metrics.MeasureSince([]string{"nomad", "client", "deregister"}, time.Now()) 259 260 if args.NodeID == "" { 261 return fmt.Errorf("missing node ID for client deregistration") 262 } 263 264 // deregister takes a batch 265 repack := &structs.NodeBatchDeregisterRequest{ 266 NodeIDs: []string{args.NodeID}, 267 WriteRequest: args.WriteRequest, 268 } 269 270 return n.deregister(repack, reply, func() (interface{}, uint64, error) { 271 return n.srv.raftApply(structs.NodeDeregisterRequestType, args) 272 }) 273} 274 275// BatchDeregister is used to remove client nodes from the cluster. 276func (n *Node) BatchDeregister(args *structs.NodeBatchDeregisterRequest, reply *structs.NodeUpdateResponse) error { 277 if done, err := n.srv.forward("Node.BatchDeregister", args, args, reply); done { 278 return err 279 } 280 defer metrics.MeasureSince([]string{"nomad", "client", "batch_deregister"}, time.Now()) 281 282 if len(args.NodeIDs) == 0 { 283 return fmt.Errorf("missing node IDs for client deregistration") 284 } 285 286 return n.deregister(args, reply, func() (interface{}, uint64, error) { 287 return n.srv.raftApply(structs.NodeBatchDeregisterRequestType, args) 288 }) 289} 290 291// deregister takes a raftMessage closure, to support both Deregister and BatchDeregister 292func (n *Node) deregister(args *structs.NodeBatchDeregisterRequest, 293 reply *structs.NodeUpdateResponse, 294 raftApplyFn func() (interface{}, uint64, error), 295) error { 296 // Check request permissions 297 if aclObj, err := n.srv.ResolveToken(args.AuthToken); err != nil { 298 return err 299 } else if aclObj != nil && !aclObj.AllowNodeWrite() { 300 return structs.ErrPermissionDenied 301 } 302 303 // Look for the node 304 snap, err := n.srv.fsm.State().Snapshot() 305 if err != nil { 306 return err 307 } 308 309 ws := memdb.NewWatchSet() 310 for _, nodeID := range args.NodeIDs { 311 node, err := snap.NodeByID(ws, nodeID) 312 if err != nil { 313 return err 314 } 315 if node == nil { 316 return fmt.Errorf("node not found") 317 } 318 } 319 320 // Commit this update via Raft 321 _, index, err := raftApplyFn() 322 if err != nil { 323 n.logger.Error("raft message failed", "error", err) 324 return err 325 } 326 327 for _, nodeID := range args.NodeIDs { 328 // Clear the heartbeat timer if any 329 n.srv.clearHeartbeatTimer(nodeID) 330 331 // Create the evaluations for this node 332 evalIDs, evalIndex, err := n.createNodeEvals(nodeID, index) 333 if err != nil { 334 n.logger.Error("eval creation failed", "error", err) 335 return err 336 } 337 338 // Determine if there are any Vault accessors on the node 339 if accessors, err := snap.VaultAccessorsByNode(ws, nodeID); err != nil { 340 n.logger.Error("looking up vault accessors for node failed", "node_id", nodeID, "error", err) 341 return err 342 } else if l := len(accessors); l > 0 { 343 n.logger.Debug("revoking vault accessors on node due to deregister", "num_accessors", l, "node_id", nodeID) 344 if err := n.srv.vault.RevokeTokens(context.Background(), accessors, true); err != nil { 345 n.logger.Error("revoking vault accessors for node failed", "node_id", nodeID, "error", err) 346 return err 347 } 348 } 349 350 // Determine if there are any SI token accessors on the node 351 if accessors, err := snap.SITokenAccessorsByNode(ws, nodeID); err != nil { 352 n.logger.Error("looking up si accessors for node failed", "node_id", nodeID, "error", err) 353 return err 354 } else if l := len(accessors); l > 0 { 355 n.logger.Debug("revoking si accessors on node due to deregister", "num_accessors", l, "node_id", nodeID) 356 // Unlike with the Vault integration, there's no error returned here, since 357 // bootstrapping the Consul client is elsewhere. Errors in revocation trigger 358 // background retry attempts rather than inline error handling. 359 _ = n.srv.consulACLs.RevokeTokens(context.Background(), accessors, true) 360 } 361 362 reply.EvalIDs = append(reply.EvalIDs, evalIDs...) 363 // Set the reply eval create index just the first time 364 if reply.EvalCreateIndex == 0 { 365 reply.EvalCreateIndex = evalIndex 366 } 367 } 368 369 reply.NodeModifyIndex = index 370 reply.Index = index 371 return nil 372} 373 374// UpdateStatus is used to update the status of a client node 375func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *structs.NodeUpdateResponse) error { 376 isForwarded := args.IsForwarded() 377 if done, err := n.srv.forward("Node.UpdateStatus", args, args, reply); done { 378 // We have a valid node connection since there is no error from the 379 // forwarded server, so add the mapping to cache the 380 // connection and allow the server to send RPCs to the client. 381 if err == nil && n.ctx != nil && n.ctx.NodeID == "" && !isForwarded { 382 n.ctx.NodeID = args.NodeID 383 n.srv.addNodeConn(n.ctx) 384 } 385 386 return err 387 } 388 defer metrics.MeasureSince([]string{"nomad", "client", "update_status"}, time.Now()) 389 390 // Verify the arguments 391 if args.NodeID == "" { 392 return fmt.Errorf("missing node ID for client status update") 393 } 394 if !structs.ValidNodeStatus(args.Status) { 395 return fmt.Errorf("invalid status for node") 396 } 397 398 // Look for the node 399 snap, err := n.srv.fsm.State().Snapshot() 400 if err != nil { 401 return err 402 } 403 404 ws := memdb.NewWatchSet() 405 node, err := snap.NodeByID(ws, args.NodeID) 406 if err != nil { 407 return err 408 } 409 if node == nil { 410 return fmt.Errorf("node not found") 411 } 412 413 // We have a valid node connection, so add the mapping to cache the 414 // connection and allow the server to send RPCs to the client. We only cache 415 // the connection if it is not being forwarded from another server. 416 if n.ctx != nil && n.ctx.NodeID == "" && !args.IsForwarded() { 417 n.ctx.NodeID = args.NodeID 418 n.srv.addNodeConn(n.ctx) 419 } 420 421 // XXX: Could use the SecretID here but have to update the heartbeat system 422 // to track SecretIDs. 423 424 // Update the timestamp of when the node status was updated 425 args.UpdatedAt = time.Now().Unix() 426 427 // Commit this update via Raft 428 var index uint64 429 if node.Status != args.Status { 430 // Attach an event if we are updating the node status to ready when it 431 // is down via a heartbeat 432 if node.Status == structs.NodeStatusDown && args.NodeEvent == nil { 433 args.NodeEvent = structs.NewNodeEvent(). 434 SetSubsystem(structs.NodeEventSubsystemCluster). 435 SetMessage(NodeHeartbeatEventReregistered) 436 } 437 438 _, index, err = n.srv.raftApply(structs.NodeUpdateStatusRequestType, args) 439 if err != nil { 440 n.logger.Error("status update failed", "error", err) 441 return err 442 } 443 reply.NodeModifyIndex = index 444 } 445 446 // Check if we should trigger evaluations 447 transitionToReady := transitionedToReady(args.Status, node.Status) 448 if structs.ShouldDrainNode(args.Status) || transitionToReady { 449 evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index) 450 if err != nil { 451 n.logger.Error("eval creation failed", "error", err) 452 return err 453 } 454 reply.EvalIDs = evalIDs 455 reply.EvalCreateIndex = evalIndex 456 } 457 458 // Check if we need to setup a heartbeat 459 switch args.Status { 460 case structs.NodeStatusDown: 461 // Determine if there are any Vault accessors on the node to cleanup 462 if accessors, err := n.srv.State().VaultAccessorsByNode(ws, args.NodeID); err != nil { 463 n.logger.Error("looking up vault accessors for node failed", "node_id", args.NodeID, "error", err) 464 return err 465 } else if l := len(accessors); l > 0 { 466 n.logger.Debug("revoking vault accessors on node due to down state", "num_accessors", l, "node_id", args.NodeID) 467 if err := n.srv.vault.RevokeTokens(context.Background(), accessors, true); err != nil { 468 n.logger.Error("revoking vault accessors for node failed", "node_id", args.NodeID, "error", err) 469 return err 470 } 471 } 472 473 // Determine if there are any SI token accessors on the node to cleanup 474 if accessors, err := n.srv.State().SITokenAccessorsByNode(ws, args.NodeID); err != nil { 475 n.logger.Error("looking up SI accessors for node failed", "node_id", args.NodeID, "error", err) 476 return err 477 } else if l := len(accessors); l > 0 { 478 n.logger.Debug("revoking SI accessors on node due to down state", "num_accessors", l, "node_id", args.NodeID) 479 _ = n.srv.consulACLs.RevokeTokens(context.Background(), accessors, true) 480 } 481 default: 482 ttl, err := n.srv.resetHeartbeatTimer(args.NodeID) 483 if err != nil { 484 n.logger.Error("heartbeat reset failed", "error", err) 485 return err 486 } 487 reply.HeartbeatTTL = ttl 488 } 489 490 // Set the reply index and leader 491 reply.Index = index 492 n.srv.peerLock.RLock() 493 defer n.srv.peerLock.RUnlock() 494 if err := n.constructNodeServerInfoResponse(snap, reply); err != nil { 495 n.logger.Error("failed to populate NodeUpdateResponse", "error", err) 496 return err 497 } 498 499 return nil 500} 501 502// transitionedToReady is a helper that takes a nodes new and old status and 503// returns whether it has transitioned to ready. 504func transitionedToReady(newStatus, oldStatus string) bool { 505 initToReady := oldStatus == structs.NodeStatusInit && newStatus == structs.NodeStatusReady 506 terminalToReady := oldStatus == structs.NodeStatusDown && newStatus == structs.NodeStatusReady 507 return initToReady || terminalToReady 508} 509 510// UpdateDrain is used to update the drain mode of a client node 511func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest, 512 reply *structs.NodeDrainUpdateResponse) error { 513 if done, err := n.srv.forward("Node.UpdateDrain", args, args, reply); done { 514 return err 515 } 516 defer metrics.MeasureSince([]string{"nomad", "client", "update_drain"}, time.Now()) 517 518 // Check node write permissions 519 if aclObj, err := n.srv.ResolveToken(args.AuthToken); err != nil { 520 return err 521 } else if aclObj != nil && !aclObj.AllowNodeWrite() { 522 return structs.ErrPermissionDenied 523 } 524 525 // Verify the arguments 526 if args.NodeID == "" { 527 return fmt.Errorf("missing node ID for drain update") 528 } 529 if args.NodeEvent != nil { 530 return fmt.Errorf("node event must not be set") 531 } 532 533 // Look for the node 534 snap, err := n.srv.fsm.State().Snapshot() 535 if err != nil { 536 return err 537 } 538 node, err := snap.NodeByID(nil, args.NodeID) 539 if err != nil { 540 return err 541 } 542 if node == nil { 543 return fmt.Errorf("node not found") 544 } 545 546 now := time.Now().UTC() 547 548 // Update the timestamp of when the node status was updated 549 args.UpdatedAt = now.Unix() 550 551 // Setup drain strategy 552 if args.DrainStrategy != nil { 553 // Mark start time for the drain 554 if node.DrainStrategy == nil { 555 args.DrainStrategy.StartedAt = now 556 } else { 557 args.DrainStrategy.StartedAt = node.DrainStrategy.StartedAt 558 } 559 560 // Mark the deadline time 561 if args.DrainStrategy.Deadline.Nanoseconds() > 0 { 562 args.DrainStrategy.ForceDeadline = now.Add(args.DrainStrategy.Deadline) 563 } 564 } 565 566 // Construct the node event 567 args.NodeEvent = structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemDrain) 568 if node.DrainStrategy == nil && args.DrainStrategy != nil { 569 args.NodeEvent.SetMessage(NodeDrainEventDrainSet) 570 } else if node.DrainStrategy != nil && args.DrainStrategy != nil { 571 args.NodeEvent.SetMessage(NodeDrainEventDrainUpdated) 572 } else if node.DrainStrategy != nil && args.DrainStrategy == nil { 573 args.NodeEvent.SetMessage(NodeDrainEventDrainDisabled) 574 } else { 575 args.NodeEvent = nil 576 } 577 578 // Commit this update via Raft 579 _, index, err := n.srv.raftApply(structs.NodeUpdateDrainRequestType, args) 580 if err != nil { 581 n.logger.Error("drain update failed", "error", err) 582 return err 583 } 584 reply.NodeModifyIndex = index 585 586 // If the node is transitioning to be eligible, create Node evaluations 587 // because there may be a System job registered that should be evaluated. 588 if node.SchedulingEligibility == structs.NodeSchedulingIneligible && args.MarkEligible && args.DrainStrategy == nil { 589 evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index) 590 if err != nil { 591 n.logger.Error("eval creation failed", "error", err) 592 return err 593 } 594 reply.EvalIDs = evalIDs 595 reply.EvalCreateIndex = evalIndex 596 } 597 598 // Set the reply index 599 reply.Index = index 600 return nil 601} 602 603// UpdateEligibility is used to update the scheduling eligibility of a node 604func (n *Node) UpdateEligibility(args *structs.NodeUpdateEligibilityRequest, 605 reply *structs.NodeEligibilityUpdateResponse) error { 606 if done, err := n.srv.forward("Node.UpdateEligibility", args, args, reply); done { 607 return err 608 } 609 defer metrics.MeasureSince([]string{"nomad", "client", "update_eligibility"}, time.Now()) 610 611 // Check node write permissions 612 if aclObj, err := n.srv.ResolveToken(args.AuthToken); err != nil { 613 return err 614 } else if aclObj != nil && !aclObj.AllowNodeWrite() { 615 return structs.ErrPermissionDenied 616 } 617 618 // Verify the arguments 619 if args.NodeID == "" { 620 return fmt.Errorf("missing node ID for setting scheduling eligibility") 621 } 622 if args.NodeEvent != nil { 623 return fmt.Errorf("node event must not be set") 624 } 625 626 // Check that only allowed types are set 627 switch args.Eligibility { 628 case structs.NodeSchedulingEligible, structs.NodeSchedulingIneligible: 629 default: 630 return fmt.Errorf("invalid scheduling eligibility %q", args.Eligibility) 631 } 632 633 // Look for the node 634 snap, err := n.srv.fsm.State().Snapshot() 635 if err != nil { 636 return err 637 } 638 node, err := snap.NodeByID(nil, args.NodeID) 639 if err != nil { 640 return err 641 } 642 if node == nil { 643 return fmt.Errorf("node not found") 644 } 645 646 if node.DrainStrategy != nil && args.Eligibility == structs.NodeSchedulingEligible { 647 return fmt.Errorf("can not set node's scheduling eligibility to eligible while it is draining") 648 } 649 650 switch args.Eligibility { 651 case structs.NodeSchedulingEligible, structs.NodeSchedulingIneligible: 652 default: 653 return fmt.Errorf("invalid scheduling eligibility %q", args.Eligibility) 654 } 655 656 // Update the timestamp of when the node status was updated 657 args.UpdatedAt = time.Now().Unix() 658 659 // Construct the node event 660 args.NodeEvent = structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemCluster) 661 if node.SchedulingEligibility == args.Eligibility { 662 return nil // Nothing to do 663 } else if args.Eligibility == structs.NodeSchedulingEligible { 664 args.NodeEvent.SetMessage(NodeEligibilityEventEligible) 665 } else { 666 args.NodeEvent.SetMessage(NodeEligibilityEventIneligible) 667 } 668 669 // Commit this update via Raft 670 outErr, index, err := n.srv.raftApply(structs.NodeUpdateEligibilityRequestType, args) 671 if err != nil { 672 n.logger.Error("eligibility update failed", "error", err) 673 return err 674 } 675 if outErr != nil { 676 if err, ok := outErr.(error); ok && err != nil { 677 n.logger.Error("eligibility update failed", "error", err) 678 return err 679 } 680 } 681 682 // If the node is transitioning to be eligible, create Node evaluations 683 // because there may be a System job registered that should be evaluated. 684 if node.SchedulingEligibility == structs.NodeSchedulingIneligible && args.Eligibility == structs.NodeSchedulingEligible { 685 evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index) 686 if err != nil { 687 n.logger.Error("eval creation failed", "error", err) 688 return err 689 } 690 reply.EvalIDs = evalIDs 691 reply.EvalCreateIndex = evalIndex 692 } 693 694 // Set the reply index 695 reply.Index = index 696 return nil 697} 698 699// Evaluate is used to force a re-evaluation of the node 700func (n *Node) Evaluate(args *structs.NodeEvaluateRequest, reply *structs.NodeUpdateResponse) error { 701 if done, err := n.srv.forward("Node.Evaluate", args, args, reply); done { 702 return err 703 } 704 defer metrics.MeasureSince([]string{"nomad", "client", "evaluate"}, time.Now()) 705 706 // Check node write permissions 707 if aclObj, err := n.srv.ResolveToken(args.AuthToken); err != nil { 708 return err 709 } else if aclObj != nil && !aclObj.AllowNodeWrite() { 710 return structs.ErrPermissionDenied 711 } 712 713 // Verify the arguments 714 if args.NodeID == "" { 715 return fmt.Errorf("missing node ID for evaluation") 716 } 717 718 // Look for the node 719 snap, err := n.srv.fsm.State().Snapshot() 720 if err != nil { 721 return err 722 } 723 ws := memdb.NewWatchSet() 724 node, err := snap.NodeByID(ws, args.NodeID) 725 if err != nil { 726 return err 727 } 728 if node == nil { 729 return fmt.Errorf("node not found") 730 } 731 732 // Create the evaluation 733 evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, node.ModifyIndex) 734 if err != nil { 735 n.logger.Error("eval creation failed", "error", err) 736 return err 737 } 738 reply.EvalIDs = evalIDs 739 reply.EvalCreateIndex = evalIndex 740 741 // Set the reply index 742 reply.Index = evalIndex 743 744 n.srv.peerLock.RLock() 745 defer n.srv.peerLock.RUnlock() 746 if err := n.constructNodeServerInfoResponse(snap, reply); err != nil { 747 n.logger.Error("failed to populate NodeUpdateResponse", "error", err) 748 return err 749 } 750 return nil 751} 752 753// GetNode is used to request information about a specific node 754func (n *Node) GetNode(args *structs.NodeSpecificRequest, 755 reply *structs.SingleNodeResponse) error { 756 if done, err := n.srv.forward("Node.GetNode", args, args, reply); done { 757 return err 758 } 759 defer metrics.MeasureSince([]string{"nomad", "client", "get_node"}, time.Now()) 760 761 // Check node read permissions 762 if aclObj, err := n.srv.ResolveToken(args.AuthToken); err != nil { 763 // If ResolveToken had an unexpected error return that 764 if err != structs.ErrTokenNotFound { 765 return err 766 } 767 768 // Attempt to lookup AuthToken as a Node.SecretID since nodes 769 // call this endpoint and don't have an ACL token. 770 node, stateErr := n.srv.fsm.State().NodeBySecretID(nil, args.AuthToken) 771 if stateErr != nil { 772 // Return the original ResolveToken error with this err 773 var merr multierror.Error 774 merr.Errors = append(merr.Errors, err, stateErr) 775 return merr.ErrorOrNil() 776 } 777 778 // Not a node or a valid ACL token 779 if node == nil { 780 return structs.ErrTokenNotFound 781 } 782 } else if aclObj != nil && !aclObj.AllowNodeRead() { 783 return structs.ErrPermissionDenied 784 } 785 786 // Setup the blocking query 787 opts := blockingOptions{ 788 queryOpts: &args.QueryOptions, 789 queryMeta: &reply.QueryMeta, 790 run: func(ws memdb.WatchSet, state *state.StateStore) error { 791 // Verify the arguments 792 if args.NodeID == "" { 793 return fmt.Errorf("missing node ID") 794 } 795 796 // Look for the node 797 out, err := state.NodeByID(ws, args.NodeID) 798 if err != nil { 799 return err 800 } 801 802 // Setup the output 803 if out != nil { 804 out = out.Sanitize() 805 reply.Node = out 806 reply.Index = out.ModifyIndex 807 } else { 808 // Use the last index that affected the nodes table 809 index, err := state.Index("nodes") 810 if err != nil { 811 return err 812 } 813 reply.Node = nil 814 reply.Index = index 815 } 816 817 // Set the query response 818 n.srv.setQueryMeta(&reply.QueryMeta) 819 return nil 820 }} 821 return n.srv.blockingRPC(&opts) 822} 823 824// GetAllocs is used to request allocations for a specific node 825func (n *Node) GetAllocs(args *structs.NodeSpecificRequest, 826 reply *structs.NodeAllocsResponse) error { 827 if done, err := n.srv.forward("Node.GetAllocs", args, args, reply); done { 828 return err 829 } 830 defer metrics.MeasureSince([]string{"nomad", "client", "get_allocs"}, time.Now()) 831 832 // Check node read and namespace job read permissions 833 aclObj, err := n.srv.ResolveToken(args.AuthToken) 834 if err != nil { 835 return err 836 } 837 if aclObj != nil && !aclObj.AllowNodeRead() { 838 return structs.ErrPermissionDenied 839 } 840 841 // cache namespace perms 842 readableNamespaces := map[string]bool{} 843 844 // readNS is a caching namespace read-job helper 845 readNS := func(ns string) bool { 846 if aclObj == nil { 847 // ACLs are disabled; everything is readable 848 return true 849 } 850 851 if readable, ok := readableNamespaces[ns]; ok { 852 // cache hit 853 return readable 854 } 855 856 // cache miss 857 readable := aclObj.AllowNsOp(ns, acl.NamespaceCapabilityReadJob) 858 readableNamespaces[ns] = readable 859 return readable 860 } 861 862 // Verify the arguments 863 if args.NodeID == "" { 864 return fmt.Errorf("missing node ID") 865 } 866 867 // Setup the blocking query 868 opts := blockingOptions{ 869 queryOpts: &args.QueryOptions, 870 queryMeta: &reply.QueryMeta, 871 run: func(ws memdb.WatchSet, state *state.StateStore) error { 872 // Look for the node 873 allocs, err := state.AllocsByNode(ws, args.NodeID) 874 if err != nil { 875 return err 876 } 877 878 // Setup the output 879 if n := len(allocs); n != 0 { 880 reply.Allocs = make([]*structs.Allocation, 0, n) 881 for _, alloc := range allocs { 882 if readNS(alloc.Namespace) { 883 reply.Allocs = append(reply.Allocs, alloc) 884 } 885 886 // Get the max of all allocs since 887 // subsequent requests need to start 888 // from the latest index 889 reply.Index = maxUint64(reply.Index, alloc.ModifyIndex) 890 } 891 } else { 892 reply.Allocs = nil 893 894 // Use the last index that affected the nodes table 895 index, err := state.Index("allocs") 896 if err != nil { 897 return err 898 } 899 900 // Must provide non-zero index to prevent blocking 901 // Index 1 is impossible anyways (due to Raft internals) 902 if index == 0 { 903 reply.Index = 1 904 } else { 905 reply.Index = index 906 } 907 } 908 return nil 909 }} 910 return n.srv.blockingRPC(&opts) 911} 912 913// GetClientAllocs is used to request a lightweight list of alloc modify indexes 914// per allocation. 915func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest, 916 reply *structs.NodeClientAllocsResponse) error { 917 isForwarded := args.IsForwarded() 918 if done, err := n.srv.forward("Node.GetClientAllocs", args, args, reply); done { 919 // We have a valid node connection since there is no error from the 920 // forwarded server, so add the mapping to cache the 921 // connection and allow the server to send RPCs to the client. 922 if err == nil && n.ctx != nil && n.ctx.NodeID == "" && !isForwarded { 923 n.ctx.NodeID = args.NodeID 924 n.srv.addNodeConn(n.ctx) 925 } 926 927 return err 928 } 929 defer metrics.MeasureSince([]string{"nomad", "client", "get_client_allocs"}, time.Now()) 930 931 // Verify the arguments 932 if args.NodeID == "" { 933 return fmt.Errorf("missing node ID") 934 } 935 936 // numOldAllocs is used to detect if there is a garbage collection event 937 // that effects the node. When an allocation is garbage collected, that does 938 // not change the modify index changes and thus the query won't unblock, 939 // even though the set of allocations on the node has changed. 940 var numOldAllocs int 941 942 // Setup the blocking query 943 opts := blockingOptions{ 944 queryOpts: &args.QueryOptions, 945 queryMeta: &reply.QueryMeta, 946 run: func(ws memdb.WatchSet, state *state.StateStore) error { 947 // Look for the node 948 node, err := state.NodeByID(ws, args.NodeID) 949 if err != nil { 950 return err 951 } 952 953 var allocs []*structs.Allocation 954 if node != nil { 955 if args.SecretID == "" { 956 return fmt.Errorf("missing node secret ID for client status update") 957 } else if args.SecretID != node.SecretID { 958 return fmt.Errorf("node secret ID does not match") 959 } 960 961 // We have a valid node connection, so add the mapping to cache the 962 // connection and allow the server to send RPCs to the client. We only cache 963 // the connection if it is not being forwarded from another server. 964 if n.ctx != nil && n.ctx.NodeID == "" && !args.IsForwarded() { 965 n.ctx.NodeID = args.NodeID 966 n.srv.addNodeConn(n.ctx) 967 } 968 969 var err error 970 allocs, err = state.AllocsByNode(ws, args.NodeID) 971 if err != nil { 972 return err 973 } 974 } 975 976 reply.Allocs = make(map[string]uint64) 977 reply.MigrateTokens = make(map[string]string) 978 979 // preferTableIndex is used to determine whether we should build the 980 // response index based on the full table indexes versus the modify 981 // indexes of the allocations on the specific node. This is 982 // preferred in the case that the node doesn't yet have allocations 983 // or when we detect a GC that effects the node. 984 preferTableIndex := true 985 986 // Setup the output 987 if numAllocs := len(allocs); numAllocs != 0 { 988 preferTableIndex = false 989 990 for _, alloc := range allocs { 991 reply.Allocs[alloc.ID] = alloc.AllocModifyIndex 992 993 // If the allocation is going to do a migration, create a 994 // migration token so that the client can authenticate with 995 // the node hosting the previous allocation. 996 if alloc.ShouldMigrate() { 997 prevAllocation, err := state.AllocByID(ws, alloc.PreviousAllocation) 998 if err != nil { 999 return err 1000 } 1001 1002 if prevAllocation != nil && prevAllocation.NodeID != alloc.NodeID { 1003 allocNode, err := state.NodeByID(ws, prevAllocation.NodeID) 1004 if err != nil { 1005 return err 1006 } 1007 if allocNode == nil { 1008 // Node must have been GC'd so skip the token 1009 continue 1010 } 1011 1012 token, err := structs.GenerateMigrateToken(prevAllocation.ID, allocNode.SecretID) 1013 if err != nil { 1014 return err 1015 } 1016 reply.MigrateTokens[alloc.ID] = token 1017 } 1018 } 1019 1020 reply.Index = maxUint64(reply.Index, alloc.ModifyIndex) 1021 } 1022 1023 // Determine if we have less allocations than before. This 1024 // indicates there was a garbage collection 1025 if numAllocs < numOldAllocs { 1026 preferTableIndex = true 1027 } 1028 1029 // Store the new number of allocations 1030 numOldAllocs = numAllocs 1031 } 1032 1033 if preferTableIndex { 1034 // Use the last index that affected the nodes table 1035 index, err := state.Index("allocs") 1036 if err != nil { 1037 return err 1038 } 1039 1040 // Must provide non-zero index to prevent blocking 1041 // Index 1 is impossible anyways (due to Raft internals) 1042 if index == 0 { 1043 reply.Index = 1 1044 } else { 1045 reply.Index = index 1046 } 1047 } 1048 return nil 1049 }} 1050 return n.srv.blockingRPC(&opts) 1051} 1052 1053// UpdateAlloc is used to update the client status of an allocation 1054func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.GenericResponse) error { 1055 if done, err := n.srv.forward("Node.UpdateAlloc", args, args, reply); done { 1056 return err 1057 } 1058 defer metrics.MeasureSince([]string{"nomad", "client", "update_alloc"}, time.Now()) 1059 1060 // Ensure at least a single alloc 1061 if len(args.Alloc) == 0 { 1062 return fmt.Errorf("must update at least one allocation") 1063 } 1064 1065 // Ensure that evals aren't set from client RPCs 1066 // We create them here before the raft update 1067 if len(args.Evals) != 0 { 1068 return fmt.Errorf("evals field must not be set") 1069 } 1070 1071 // Update modified timestamp for client initiated allocation updates 1072 now := time.Now() 1073 var evals []*structs.Evaluation 1074 1075 for _, allocToUpdate := range args.Alloc { 1076 allocToUpdate.ModifyTime = now.UTC().UnixNano() 1077 1078 if !allocToUpdate.TerminalStatus() { 1079 continue 1080 } 1081 1082 alloc, _ := n.srv.State().AllocByID(nil, allocToUpdate.ID) 1083 if alloc == nil { 1084 continue 1085 } 1086 1087 // if the job has been purged, this will always return error 1088 job, err := n.srv.State().JobByID(nil, alloc.Namespace, alloc.JobID) 1089 if err != nil { 1090 n.logger.Debug("UpdateAlloc unable to find job", "job", alloc.JobID, "error", err) 1091 continue 1092 } 1093 if job == nil { 1094 n.logger.Debug("UpdateAlloc unable to find job", "job", alloc.JobID) 1095 continue 1096 } 1097 1098 taskGroup := job.LookupTaskGroup(alloc.TaskGroup) 1099 if taskGroup == nil { 1100 continue 1101 } 1102 1103 // Add an evaluation if this is a failed alloc that is eligible for rescheduling 1104 if allocToUpdate.ClientStatus == structs.AllocClientStatusFailed && alloc.FollowupEvalID == "" && alloc.RescheduleEligible(taskGroup.ReschedulePolicy, now) { 1105 eval := &structs.Evaluation{ 1106 ID: uuid.Generate(), 1107 Namespace: alloc.Namespace, 1108 TriggeredBy: structs.EvalTriggerRetryFailedAlloc, 1109 JobID: alloc.JobID, 1110 Type: job.Type, 1111 Priority: job.Priority, 1112 Status: structs.EvalStatusPending, 1113 CreateTime: now.UTC().UnixNano(), 1114 ModifyTime: now.UTC().UnixNano(), 1115 } 1116 evals = append(evals, eval) 1117 } 1118 } 1119 1120 // Add this to the batch 1121 n.updatesLock.Lock() 1122 n.updates = append(n.updates, args.Alloc...) 1123 n.evals = append(n.evals, evals...) 1124 1125 // Start a new batch if none 1126 future := n.updateFuture 1127 if future == nil { 1128 future = structs.NewBatchFuture() 1129 n.updateFuture = future 1130 n.updateTimer = time.AfterFunc(batchUpdateInterval, func() { 1131 // Get the pending updates 1132 n.updatesLock.Lock() 1133 updates := n.updates 1134 evals := n.evals 1135 future := n.updateFuture 1136 1137 // Assume future update patterns will be similar to 1138 // current batch and set cap appropriately to avoid 1139 // slice resizing. 1140 n.updates = make([]*structs.Allocation, 0, len(updates)) 1141 n.evals = make([]*structs.Evaluation, 0, len(evals)) 1142 1143 n.updateFuture = nil 1144 n.updateTimer = nil 1145 n.updatesLock.Unlock() 1146 1147 // Perform the batch update 1148 n.batchUpdate(future, updates, evals) 1149 }) 1150 } 1151 n.updatesLock.Unlock() 1152 1153 // Wait for the future 1154 if err := future.Wait(); err != nil { 1155 return err 1156 } 1157 1158 // Setup the response 1159 reply.Index = future.Index() 1160 return nil 1161} 1162 1163// batchUpdate is used to update all the allocations 1164func (n *Node) batchUpdate(future *structs.BatchFuture, updates []*structs.Allocation, evals []*structs.Evaluation) { 1165 // Group pending evals by jobID to prevent creating unnecessary evals 1166 evalsByJobId := make(map[structs.NamespacedID]struct{}) 1167 var trimmedEvals []*structs.Evaluation 1168 for _, eval := range evals { 1169 namespacedID := structs.NamespacedID{ 1170 ID: eval.JobID, 1171 Namespace: eval.Namespace, 1172 } 1173 _, exists := evalsByJobId[namespacedID] 1174 if !exists { 1175 now := time.Now().UTC().UnixNano() 1176 eval.CreateTime = now 1177 eval.ModifyTime = now 1178 trimmedEvals = append(trimmedEvals, eval) 1179 evalsByJobId[namespacedID] = struct{}{} 1180 } 1181 } 1182 1183 if len(trimmedEvals) > 0 { 1184 n.logger.Debug("adding evaluations for rescheduling failed allocations", "num_evals", len(trimmedEvals)) 1185 } 1186 // Prepare the batch update 1187 batch := &structs.AllocUpdateRequest{ 1188 Alloc: updates, 1189 Evals: trimmedEvals, 1190 WriteRequest: structs.WriteRequest{Region: n.srv.config.Region}, 1191 } 1192 1193 // Commit this update via Raft 1194 var mErr multierror.Error 1195 _, index, err := n.srv.raftApply(structs.AllocClientUpdateRequestType, batch) 1196 if err != nil { 1197 n.logger.Error("alloc update failed", "error", err) 1198 mErr.Errors = append(mErr.Errors, err) 1199 } 1200 1201 // For each allocation we are updating, check if we should revoke any 1202 // - Vault token accessors 1203 // - Service Identity token accessors 1204 var ( 1205 revokeVault []*structs.VaultAccessor 1206 revokeSI []*structs.SITokenAccessor 1207 ) 1208 1209 for _, alloc := range updates { 1210 // Skip any allocation that isn't dead on the client 1211 if !alloc.Terminated() { 1212 continue 1213 } 1214 1215 ws := memdb.NewWatchSet() 1216 1217 // Determine if there are any orphaned Vault accessors for the allocation 1218 if accessors, err := n.srv.State().VaultAccessorsByAlloc(ws, alloc.ID); err != nil { 1219 n.logger.Error("looking up vault accessors for alloc failed", "alloc_id", alloc.ID, "error", err) 1220 mErr.Errors = append(mErr.Errors, err) 1221 } else { 1222 revokeVault = append(revokeVault, accessors...) 1223 } 1224 1225 // Determine if there are any orphaned SI accessors for the allocation 1226 if accessors, err := n.srv.State().SITokenAccessorsByAlloc(ws, alloc.ID); err != nil { 1227 n.logger.Error("looking up si accessors for alloc failed", "alloc_id", alloc.ID, "error", err) 1228 mErr.Errors = append(mErr.Errors, err) 1229 } else { 1230 revokeSI = append(revokeSI, accessors...) 1231 } 1232 } 1233 1234 // Revoke any orphaned Vault token accessors 1235 if l := len(revokeVault); l > 0 { 1236 n.logger.Debug("revoking vault accessors due to terminal allocations", "num_accessors", l) 1237 if err := n.srv.vault.RevokeTokens(context.Background(), revokeVault, true); err != nil { 1238 n.logger.Error("batched vault accessor revocation failed", "error", err) 1239 mErr.Errors = append(mErr.Errors, err) 1240 } 1241 } 1242 1243 // Revoke any orphaned SI token accessors 1244 if l := len(revokeSI); l > 0 { 1245 n.logger.Debug("revoking si accessors due to terminal allocations", "num_accessors", l) 1246 _ = n.srv.consulACLs.RevokeTokens(context.Background(), revokeSI, true) 1247 } 1248 1249 // Respond to the future 1250 future.Respond(index, mErr.ErrorOrNil()) 1251} 1252 1253// List is used to list the available nodes 1254func (n *Node) List(args *structs.NodeListRequest, 1255 reply *structs.NodeListResponse) error { 1256 if done, err := n.srv.forward("Node.List", args, args, reply); done { 1257 return err 1258 } 1259 defer metrics.MeasureSince([]string{"nomad", "client", "list"}, time.Now()) 1260 1261 // Check node read permissions 1262 if aclObj, err := n.srv.ResolveToken(args.AuthToken); err != nil { 1263 return err 1264 } else if aclObj != nil && !aclObj.AllowNodeRead() { 1265 return structs.ErrPermissionDenied 1266 } 1267 1268 // Setup the blocking query 1269 opts := blockingOptions{ 1270 queryOpts: &args.QueryOptions, 1271 queryMeta: &reply.QueryMeta, 1272 run: func(ws memdb.WatchSet, state *state.StateStore) error { 1273 // Capture all the nodes 1274 var err error 1275 var iter memdb.ResultIterator 1276 if prefix := args.QueryOptions.Prefix; prefix != "" { 1277 iter, err = state.NodesByIDPrefix(ws, prefix) 1278 } else { 1279 iter, err = state.Nodes(ws) 1280 } 1281 if err != nil { 1282 return err 1283 } 1284 1285 var nodes []*structs.NodeListStub 1286 for { 1287 raw := iter.Next() 1288 if raw == nil { 1289 break 1290 } 1291 node := raw.(*structs.Node) 1292 nodes = append(nodes, node.Stub(args.Fields)) 1293 } 1294 reply.Nodes = nodes 1295 1296 // Use the last index that affected the jobs table 1297 index, err := state.Index("nodes") 1298 if err != nil { 1299 return err 1300 } 1301 reply.Index = index 1302 1303 // Set the query response 1304 n.srv.setQueryMeta(&reply.QueryMeta) 1305 return nil 1306 }} 1307 return n.srv.blockingRPC(&opts) 1308} 1309 1310// createNodeEvals is used to create evaluations for each alloc on a node. 1311// Each Eval is scoped to a job, so we need to potentially trigger many evals. 1312func (n *Node) createNodeEvals(nodeID string, nodeIndex uint64) ([]string, uint64, error) { 1313 // Snapshot the state 1314 snap, err := n.srv.fsm.State().Snapshot() 1315 if err != nil { 1316 return nil, 0, fmt.Errorf("failed to snapshot state: %v", err) 1317 } 1318 1319 // Find all the allocations for this node 1320 ws := memdb.NewWatchSet() 1321 allocs, err := snap.AllocsByNode(ws, nodeID) 1322 if err != nil { 1323 return nil, 0, fmt.Errorf("failed to find allocs for '%s': %v", nodeID, err) 1324 } 1325 1326 sysJobsIter, err := snap.JobsByScheduler(ws, "system") 1327 if err != nil { 1328 return nil, 0, fmt.Errorf("failed to find system jobs for '%s': %v", nodeID, err) 1329 } 1330 1331 var sysJobs []*structs.Job 1332 for job := sysJobsIter.Next(); job != nil; job = sysJobsIter.Next() { 1333 sysJobs = append(sysJobs, job.(*structs.Job)) 1334 } 1335 1336 // Fast-path if nothing to do 1337 if len(allocs) == 0 && len(sysJobs) == 0 { 1338 return nil, 0, nil 1339 } 1340 1341 // Create an eval for each JobID affected 1342 var evals []*structs.Evaluation 1343 var evalIDs []string 1344 jobIDs := make(map[string]struct{}) 1345 now := time.Now().UTC().UnixNano() 1346 1347 for _, alloc := range allocs { 1348 // Deduplicate on JobID 1349 if _, ok := jobIDs[alloc.JobID]; ok { 1350 continue 1351 } 1352 jobIDs[alloc.JobID] = struct{}{} 1353 1354 // Create a new eval 1355 eval := &structs.Evaluation{ 1356 ID: uuid.Generate(), 1357 Namespace: alloc.Namespace, 1358 Priority: alloc.Job.Priority, 1359 Type: alloc.Job.Type, 1360 TriggeredBy: structs.EvalTriggerNodeUpdate, 1361 JobID: alloc.JobID, 1362 NodeID: nodeID, 1363 NodeModifyIndex: nodeIndex, 1364 Status: structs.EvalStatusPending, 1365 CreateTime: now, 1366 ModifyTime: now, 1367 } 1368 evals = append(evals, eval) 1369 evalIDs = append(evalIDs, eval.ID) 1370 } 1371 1372 // Create an evaluation for each system job. 1373 for _, job := range sysJobs { 1374 // Still dedup on JobID as the node may already have the system job. 1375 if _, ok := jobIDs[job.ID]; ok { 1376 continue 1377 } 1378 jobIDs[job.ID] = struct{}{} 1379 1380 // Create a new eval 1381 eval := &structs.Evaluation{ 1382 ID: uuid.Generate(), 1383 Namespace: job.Namespace, 1384 Priority: job.Priority, 1385 Type: job.Type, 1386 TriggeredBy: structs.EvalTriggerNodeUpdate, 1387 JobID: job.ID, 1388 NodeID: nodeID, 1389 NodeModifyIndex: nodeIndex, 1390 Status: structs.EvalStatusPending, 1391 CreateTime: now, 1392 ModifyTime: now, 1393 } 1394 evals = append(evals, eval) 1395 evalIDs = append(evalIDs, eval.ID) 1396 } 1397 1398 // Create the Raft transaction 1399 update := &structs.EvalUpdateRequest{ 1400 Evals: evals, 1401 WriteRequest: structs.WriteRequest{Region: n.srv.config.Region}, 1402 } 1403 1404 // Commit this evaluation via Raft 1405 // XXX: There is a risk of partial failure where the node update succeeds 1406 // but that the EvalUpdate does not. 1407 _, evalIndex, err := n.srv.raftApply(structs.EvalUpdateRequestType, update) 1408 if err != nil { 1409 return nil, 0, err 1410 } 1411 return evalIDs, evalIndex, nil 1412} 1413 1414// DeriveVaultToken is used by the clients to request wrapped Vault tokens for 1415// tasks 1416func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest, reply *structs.DeriveVaultTokenResponse) error { 1417 setError := func(e error, recoverable bool) { 1418 if e != nil { 1419 if re, ok := e.(*structs.RecoverableError); ok { 1420 reply.Error = re // No need to wrap if error is already a RecoverableError 1421 } else { 1422 reply.Error = structs.NewRecoverableError(e, recoverable).(*structs.RecoverableError) 1423 } 1424 n.logger.Error("DeriveVaultToken failed", "recoverable", recoverable, "error", e) 1425 } 1426 } 1427 1428 if done, err := n.srv.forward("Node.DeriveVaultToken", args, args, reply); done { 1429 setError(err, structs.IsRecoverable(err) || err == structs.ErrNoLeader) 1430 return nil 1431 } 1432 defer metrics.MeasureSince([]string{"nomad", "client", "derive_vault_token"}, time.Now()) 1433 1434 // Verify the arguments 1435 if args.NodeID == "" { 1436 setError(fmt.Errorf("missing node ID"), false) 1437 return nil 1438 } 1439 if args.SecretID == "" { 1440 setError(fmt.Errorf("missing node SecretID"), false) 1441 return nil 1442 } 1443 if args.AllocID == "" { 1444 setError(fmt.Errorf("missing allocation ID"), false) 1445 return nil 1446 } 1447 if len(args.Tasks) == 0 { 1448 setError(fmt.Errorf("no tasks specified"), false) 1449 return nil 1450 } 1451 1452 // Verify the following: 1453 // * The Node exists and has the correct SecretID 1454 // * The Allocation exists on the specified Node 1455 // * The Allocation contains the given tasks and they each require Vault 1456 // tokens 1457 snap, err := n.srv.fsm.State().Snapshot() 1458 if err != nil { 1459 setError(err, false) 1460 return nil 1461 } 1462 ws := memdb.NewWatchSet() 1463 node, err := snap.NodeByID(ws, args.NodeID) 1464 if err != nil { 1465 setError(err, false) 1466 return nil 1467 } 1468 if node == nil { 1469 setError(fmt.Errorf("Node %q does not exist", args.NodeID), false) 1470 return nil 1471 } 1472 if node.SecretID != args.SecretID { 1473 setError(fmt.Errorf("SecretID mismatch"), false) 1474 return nil 1475 } 1476 1477 alloc, err := snap.AllocByID(ws, args.AllocID) 1478 if err != nil { 1479 setError(err, false) 1480 return nil 1481 } 1482 if alloc == nil { 1483 setError(fmt.Errorf("Allocation %q does not exist", args.AllocID), false) 1484 return nil 1485 } 1486 if alloc.NodeID != args.NodeID { 1487 setError(fmt.Errorf("Allocation %q not running on Node %q", args.AllocID, args.NodeID), false) 1488 return nil 1489 } 1490 if alloc.TerminalStatus() { 1491 setError(fmt.Errorf("Can't request Vault token for terminal allocation"), false) 1492 return nil 1493 } 1494 1495 // Check the policies 1496 policies := alloc.Job.VaultPolicies() 1497 if policies == nil { 1498 setError(fmt.Errorf("Job doesn't require Vault policies"), false) 1499 return nil 1500 } 1501 tg, ok := policies[alloc.TaskGroup] 1502 if !ok { 1503 setError(fmt.Errorf("Task group does not require Vault policies"), false) 1504 return nil 1505 } 1506 1507 var unneeded []string 1508 for _, task := range args.Tasks { 1509 taskVault := tg[task] 1510 if taskVault == nil || len(taskVault.Policies) == 0 { 1511 unneeded = append(unneeded, task) 1512 } 1513 } 1514 1515 if len(unneeded) != 0 { 1516 e := fmt.Errorf("Requested Vault tokens for tasks without defined Vault policies: %s", 1517 strings.Join(unneeded, ", ")) 1518 setError(e, false) 1519 return nil 1520 } 1521 1522 // At this point the request is valid and we should contact Vault for 1523 // tokens. 1524 1525 // Create an error group where we will spin up a fixed set of goroutines to 1526 // handle deriving tokens but where if any fails the whole group is 1527 // canceled. 1528 g, ctx := errgroup.WithContext(context.Background()) 1529 1530 // Cap the handlers 1531 handlers := len(args.Tasks) 1532 if handlers > maxParallelRequestsPerDerive { 1533 handlers = maxParallelRequestsPerDerive 1534 } 1535 1536 // Create the Vault Tokens 1537 input := make(chan string, handlers) 1538 results := make(map[string]*vapi.Secret, len(args.Tasks)) 1539 for i := 0; i < handlers; i++ { 1540 g.Go(func() error { 1541 for { 1542 select { 1543 case task, ok := <-input: 1544 if !ok { 1545 return nil 1546 } 1547 1548 secret, err := n.srv.vault.CreateToken(ctx, alloc, task) 1549 if err != nil { 1550 return err 1551 } 1552 1553 results[task] = secret 1554 case <-ctx.Done(): 1555 return nil 1556 } 1557 } 1558 }) 1559 } 1560 1561 // Send the input 1562 go func() { 1563 defer close(input) 1564 for _, task := range args.Tasks { 1565 select { 1566 case <-ctx.Done(): 1567 return 1568 case input <- task: 1569 } 1570 } 1571 }() 1572 1573 // Wait for everything to complete or for an error 1574 createErr := g.Wait() 1575 1576 // Retrieve the results 1577 accessors := make([]*structs.VaultAccessor, 0, len(results)) 1578 tokens := make(map[string]string, len(results)) 1579 for task, secret := range results { 1580 w := secret.WrapInfo 1581 tokens[task] = w.Token 1582 accessor := &structs.VaultAccessor{ 1583 Accessor: w.WrappedAccessor, 1584 Task: task, 1585 NodeID: alloc.NodeID, 1586 AllocID: alloc.ID, 1587 CreationTTL: w.TTL, 1588 } 1589 1590 accessors = append(accessors, accessor) 1591 } 1592 1593 // If there was an error revoke the created tokens 1594 if createErr != nil { 1595 n.logger.Error("Vault token creation for alloc failed", "alloc_id", alloc.ID, "error", createErr) 1596 1597 if revokeErr := n.srv.vault.RevokeTokens(context.Background(), accessors, false); revokeErr != nil { 1598 n.logger.Error("Vault token revocation for alloc failed", "alloc_id", alloc.ID, "error", revokeErr) 1599 } 1600 1601 if rerr, ok := createErr.(*structs.RecoverableError); ok { 1602 reply.Error = rerr 1603 } else { 1604 reply.Error = structs.NewRecoverableError(createErr, false).(*structs.RecoverableError) 1605 } 1606 1607 return nil 1608 } 1609 1610 // Commit to Raft before returning any of the tokens 1611 req := structs.VaultAccessorsRequest{Accessors: accessors} 1612 _, index, err := n.srv.raftApply(structs.VaultAccessorRegisterRequestType, &req) 1613 if err != nil { 1614 n.logger.Error("registering Vault accessors for alloc failed", "alloc_id", alloc.ID, "error", err) 1615 1616 // Determine if we can recover from the error 1617 retry := false 1618 switch err { 1619 case raft.ErrNotLeader, raft.ErrLeadershipLost, raft.ErrRaftShutdown, raft.ErrEnqueueTimeout: 1620 retry = true 1621 } 1622 1623 setError(err, retry) 1624 return nil 1625 } 1626 1627 reply.Index = index 1628 reply.Tasks = tokens 1629 n.srv.setQueryMeta(&reply.QueryMeta) 1630 return nil 1631} 1632 1633type connectTask struct { 1634 TaskKind structs.TaskKind 1635 TaskName string 1636} 1637 1638func (n *Node) DeriveSIToken(args *structs.DeriveSITokenRequest, reply *structs.DeriveSITokenResponse) error { 1639 setError := func(e error, recoverable bool) { 1640 if e != nil { 1641 if re, ok := e.(*structs.RecoverableError); ok { 1642 reply.Error = re // No need to wrap if error is already a RecoverableError 1643 } else { 1644 reply.Error = structs.NewRecoverableError(e, recoverable).(*structs.RecoverableError) 1645 } 1646 n.logger.Error("DeriveSIToken failed", "recoverable", recoverable, "error", e) 1647 } 1648 } 1649 1650 if done, err := n.srv.forward("Node.DeriveSIToken", args, args, reply); done { 1651 setError(err, structs.IsRecoverable(err) || err == structs.ErrNoLeader) 1652 return nil 1653 } 1654 defer metrics.MeasureSince([]string{"nomad", "client", "derive_si_token"}, time.Now()) 1655 1656 // Verify the arguments 1657 if err := args.Validate(); err != nil { 1658 setError(err, false) 1659 return nil 1660 } 1661 1662 // Get the ClusterID 1663 clusterID, err := n.srv.ClusterID() 1664 if err != nil { 1665 setError(err, false) 1666 return nil 1667 } 1668 1669 // Verify the following: 1670 // * The Node exists and has the correct SecretID. 1671 // * The Allocation exists on the specified Node. 1672 // * The Allocation contains the given tasks, and each task requires a 1673 // SI token. 1674 1675 snap, err := n.srv.fsm.State().Snapshot() 1676 if err != nil { 1677 setError(err, false) 1678 return nil 1679 } 1680 node, err := snap.NodeByID(nil, args.NodeID) 1681 if err != nil { 1682 setError(err, false) 1683 return nil 1684 } 1685 if node == nil { 1686 setError(errors.Errorf("Node %q does not exist", args.NodeID), false) 1687 return nil 1688 } 1689 if node.SecretID != args.SecretID { 1690 setError(errors.Errorf("SecretID mismatch"), false) 1691 return nil 1692 } 1693 1694 alloc, err := snap.AllocByID(nil, args.AllocID) 1695 if err != nil { 1696 setError(err, false) 1697 return nil 1698 } 1699 if alloc == nil { 1700 setError(errors.Errorf("Allocation %q does not exist", args.AllocID), false) 1701 return nil 1702 } 1703 if alloc.NodeID != args.NodeID { 1704 setError(errors.Errorf("Allocation %q not running on node %q", args.AllocID, args.NodeID), false) 1705 return nil 1706 } 1707 if alloc.TerminalStatus() { 1708 setError(errors.Errorf("Cannot request SI token for terminal allocation"), false) 1709 return nil 1710 } 1711 1712 // make sure task group contains at least one connect enabled service 1713 tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) 1714 if tg == nil { 1715 setError(errors.Errorf("Allocation %q does not contain TaskGroup %q", args.AllocID, alloc.TaskGroup), false) 1716 return nil 1717 } 1718 if !tg.UsesConnect() { 1719 setError(errors.Errorf("TaskGroup %q does not use Connect", tg.Name), false) 1720 return nil 1721 } 1722 1723 // make sure each task in args.Tasks is a connect-enabled task 1724 notConnect, tasks := connectTasks(tg, args.Tasks) 1725 if len(notConnect) > 0 { 1726 setError(fmt.Errorf( 1727 "Requested Consul Service Identity tokens for tasks that are not Connect enabled: %v", 1728 strings.Join(notConnect, ", "), 1729 ), false) 1730 } 1731 1732 // At this point the request is valid and we should contact Consul for tokens. 1733 1734 // A lot of the following is copied from DeriveVaultToken which has been 1735 // working fine for years. 1736 1737 // Create an error group where we will spin up a fixed set of goroutines to 1738 // handle deriving tokens but where if any fails the whole group is 1739 // canceled. 1740 g, ctx := errgroup.WithContext(context.Background()) 1741 1742 // Cap the worker threads 1743 numWorkers := len(args.Tasks) 1744 if numWorkers > maxParallelRequestsPerDerive { 1745 numWorkers = maxParallelRequestsPerDerive 1746 } 1747 1748 // would like to pull some of this out... 1749 1750 // Create the SI tokens from a slice of task name + connect service 1751 input := make(chan connectTask, numWorkers) 1752 results := make(map[string]*structs.SIToken, numWorkers) 1753 for i := 0; i < numWorkers; i++ { 1754 g.Go(func() error { 1755 for { 1756 select { 1757 case task, ok := <-input: 1758 if !ok { 1759 return nil 1760 } 1761 secret, err := n.srv.consulACLs.CreateToken(ctx, ServiceIdentityRequest{ 1762 ConsulNamespace: tg.Consul.GetNamespace(), 1763 TaskKind: task.TaskKind, 1764 TaskName: task.TaskName, 1765 ClusterID: clusterID, 1766 AllocID: alloc.ID, 1767 }) 1768 if err != nil { 1769 return err 1770 } 1771 results[task.TaskName] = secret 1772 case <-ctx.Done(): 1773 return nil 1774 } 1775 } 1776 }) 1777 } 1778 1779 // Send the input 1780 go func() { 1781 defer close(input) 1782 for _, connectTask := range tasks { 1783 select { 1784 case <-ctx.Done(): 1785 return 1786 case input <- connectTask: 1787 } 1788 } 1789 }() 1790 1791 // Wait for everything to complete or for an error 1792 createErr := g.Wait() 1793 1794 accessors := make([]*structs.SITokenAccessor, 0, len(results)) 1795 tokens := make(map[string]string, len(results)) 1796 for task, secret := range results { 1797 tokens[task] = secret.SecretID 1798 accessor := &structs.SITokenAccessor{ 1799 ConsulNamespace: tg.Consul.GetNamespace(), 1800 NodeID: alloc.NodeID, 1801 AllocID: alloc.ID, 1802 TaskName: task, 1803 AccessorID: secret.AccessorID, 1804 } 1805 accessors = append(accessors, accessor) 1806 } 1807 1808 // If there was an error, revoke all created tokens. These tokens have not 1809 // yet been committed to the persistent store. 1810 if createErr != nil { 1811 n.logger.Error("Consul Service Identity token creation for alloc failed", "alloc_id", alloc.ID, "error", createErr) 1812 _ = n.srv.consulACLs.RevokeTokens(context.Background(), accessors, false) 1813 1814 if recoverable, ok := createErr.(*structs.RecoverableError); ok { 1815 reply.Error = recoverable 1816 } else { 1817 reply.Error = structs.NewRecoverableError(createErr, false).(*structs.RecoverableError) 1818 } 1819 1820 return nil 1821 } 1822 1823 // Commit the derived tokens to raft before returning them 1824 requested := structs.SITokenAccessorsRequest{Accessors: accessors} 1825 _, index, err := n.srv.raftApply(structs.ServiceIdentityAccessorRegisterRequestType, &requested) 1826 if err != nil { 1827 n.logger.Error("registering Service Identity token accessors for alloc failed", "alloc_id", alloc.ID, "error", err) 1828 1829 // Determine if we can recover from the error 1830 retry := false 1831 switch err { 1832 case raft.ErrNotLeader, raft.ErrLeadershipLost, raft.ErrRaftShutdown, raft.ErrEnqueueTimeout: 1833 retry = true 1834 } 1835 setError(err, retry) 1836 return nil 1837 } 1838 1839 // We made it! Now we can set the reply. 1840 reply.Index = index 1841 reply.Tokens = tokens 1842 n.srv.setQueryMeta(&reply.QueryMeta) 1843 return nil 1844} 1845 1846func connectTasks(tg *structs.TaskGroup, tasks []string) ([]string, []connectTask) { 1847 var notConnect []string 1848 var usesConnect []connectTask 1849 for _, task := range tasks { 1850 tgTask := tg.LookupTask(task) 1851 if !taskUsesConnect(tgTask) { 1852 notConnect = append(notConnect, task) 1853 } else { 1854 usesConnect = append(usesConnect, connectTask{ 1855 TaskName: task, 1856 TaskKind: tgTask.Kind, 1857 }) 1858 } 1859 } 1860 return notConnect, usesConnect 1861} 1862 1863func taskUsesConnect(task *structs.Task) bool { 1864 if task == nil { 1865 // not even in the task group 1866 return false 1867 } 1868 return task.UsesConnect() 1869} 1870 1871func (n *Node) EmitEvents(args *structs.EmitNodeEventsRequest, reply *structs.EmitNodeEventsResponse) error { 1872 if done, err := n.srv.forward("Node.EmitEvents", args, args, reply); done { 1873 return err 1874 } 1875 defer metrics.MeasureSince([]string{"nomad", "client", "emit_events"}, time.Now()) 1876 1877 if len(args.NodeEvents) == 0 { 1878 return fmt.Errorf("no node events given") 1879 } 1880 for nodeID, events := range args.NodeEvents { 1881 if len(events) == 0 { 1882 return fmt.Errorf("no node events given for node %q", nodeID) 1883 } 1884 } 1885 1886 _, index, err := n.srv.raftApply(structs.UpsertNodeEventsType, args) 1887 if err != nil { 1888 n.logger.Error("upserting node events failed", "error", err) 1889 return err 1890 } 1891 1892 reply.Index = index 1893 return nil 1894} 1895