1package nomad
2
3import (
4	"context"
5	"fmt"
6	"strings"
7	"sync"
8	"time"
9
10	"golang.org/x/sync/errgroup"
11
12	metrics "github.com/armon/go-metrics"
13	log "github.com/hashicorp/go-hclog"
14	memdb "github.com/hashicorp/go-memdb"
15	multierror "github.com/hashicorp/go-multierror"
16	vapi "github.com/hashicorp/vault/api"
17
18	"github.com/hashicorp/nomad/acl"
19	"github.com/hashicorp/nomad/helper/uuid"
20	"github.com/hashicorp/nomad/nomad/state"
21	"github.com/hashicorp/nomad/nomad/structs"
22	"github.com/hashicorp/raft"
23	"github.com/pkg/errors"
24)
25
26const (
27	// batchUpdateInterval is how long we wait to batch updates
28	batchUpdateInterval = 50 * time.Millisecond
29
30	// maxParallelRequestsPerDerive  is the maximum number of parallel Vault
31	// create token requests that may be outstanding per derive request
32	maxParallelRequestsPerDerive = 16
33
34	// NodeDrainEvents are the various drain messages
35	NodeDrainEventDrainSet      = "Node drain strategy set"
36	NodeDrainEventDrainDisabled = "Node drain disabled"
37	NodeDrainEventDrainUpdated  = "Node drain stategy updated"
38
39	// NodeEligibilityEventEligible is used when the nodes eligiblity is marked
40	// eligible
41	NodeEligibilityEventEligible = "Node marked as eligible for scheduling"
42
43	// NodeEligibilityEventIneligible is used when the nodes eligiblity is marked
44	// ineligible
45	NodeEligibilityEventIneligible = "Node marked as ineligible for scheduling"
46
47	// NodeHeartbeatEventReregistered is the message used when the node becomes
48	// reregistered by the heartbeat.
49	NodeHeartbeatEventReregistered = "Node reregistered by heartbeat"
50)
51
52// Node endpoint is used for client interactions
53type Node struct {
54	srv    *Server
55	logger log.Logger
56
57	// ctx provides context regarding the underlying connection
58	ctx *RPCContext
59
60	// updates holds pending client status updates for allocations
61	updates []*structs.Allocation
62
63	// evals holds pending rescheduling eval updates triggered by failed allocations
64	evals []*structs.Evaluation
65
66	// updateFuture is used to wait for the pending batch update
67	// to complete. This may be nil if no batch is pending.
68	updateFuture *structs.BatchFuture
69
70	// updateTimer is the timer that will trigger the next batch
71	// update, and may be nil if there is no batch pending.
72	updateTimer *time.Timer
73
74	// updatesLock synchronizes access to the updates list,
75	// the future and the timer.
76	updatesLock sync.Mutex
77}
78
79// Register is used to upsert a client that is available for scheduling
80func (n *Node) Register(args *structs.NodeRegisterRequest, reply *structs.NodeUpdateResponse) error {
81	isForwarded := args.IsForwarded()
82	if done, err := n.srv.forward("Node.Register", args, args, reply); done {
83		// We have a valid node connection since there is no error from the
84		// forwarded server, so add the mapping to cache the
85		// connection and allow the server to send RPCs to the client.
86		if err == nil && n.ctx != nil && n.ctx.NodeID == "" && !isForwarded {
87			n.ctx.NodeID = args.Node.ID
88			n.srv.addNodeConn(n.ctx)
89		}
90
91		return err
92	}
93	defer metrics.MeasureSince([]string{"nomad", "client", "register"}, time.Now())
94
95	// Validate the arguments
96	if args.Node == nil {
97		return fmt.Errorf("missing node for client registration")
98	}
99	if args.Node.ID == "" {
100		return fmt.Errorf("missing node ID for client registration")
101	}
102	if args.Node.Datacenter == "" {
103		return fmt.Errorf("missing datacenter for client registration")
104	}
105	if args.Node.Name == "" {
106		return fmt.Errorf("missing node name for client registration")
107	}
108	if len(args.Node.Attributes) == 0 {
109		return fmt.Errorf("missing attributes for client registration")
110	}
111	if args.Node.SecretID == "" {
112		return fmt.Errorf("missing node secret ID for client registration")
113	}
114
115	// Default the status if none is given
116	if args.Node.Status == "" {
117		args.Node.Status = structs.NodeStatusInit
118	}
119	if !structs.ValidNodeStatus(args.Node.Status) {
120		return fmt.Errorf("invalid status for node")
121	}
122
123	// Default to eligible for scheduling if unset
124	if args.Node.SchedulingEligibility == "" {
125		args.Node.SchedulingEligibility = structs.NodeSchedulingEligible
126	}
127
128	// Set the timestamp when the node is registered
129	args.Node.StatusUpdatedAt = time.Now().Unix()
130
131	// Compute the node class
132	if err := args.Node.ComputeClass(); err != nil {
133		return fmt.Errorf("failed to computed node class: %v", err)
134	}
135
136	// Look for the node so we can detect a state transition
137	snap, err := n.srv.fsm.State().Snapshot()
138	if err != nil {
139		return err
140	}
141
142	ws := memdb.NewWatchSet()
143	originalNode, err := snap.NodeByID(ws, args.Node.ID)
144	if err != nil {
145		return err
146	}
147
148	// Check if the SecretID has been tampered with
149	if originalNode != nil {
150		if args.Node.SecretID != originalNode.SecretID && originalNode.SecretID != "" {
151			return fmt.Errorf("node secret ID does not match. Not registering node.")
152		}
153	}
154
155	// We have a valid node connection, so add the mapping to cache the
156	// connection and allow the server to send RPCs to the client. We only cache
157	// the connection if it is not being forwarded from another server.
158	if n.ctx != nil && n.ctx.NodeID == "" && !args.IsForwarded() {
159		n.ctx.NodeID = args.Node.ID
160		n.srv.addNodeConn(n.ctx)
161	}
162
163	// Commit this update via Raft
164	_, index, err := n.srv.raftApply(structs.NodeRegisterRequestType, args)
165	if err != nil {
166		n.logger.Error("register failed", "error", err)
167		return err
168	}
169	reply.NodeModifyIndex = index
170
171	// Check if we should trigger evaluations
172	originalStatus := structs.NodeStatusInit
173	if originalNode != nil {
174		originalStatus = originalNode.Status
175	}
176	transitionToReady := transitionedToReady(args.Node.Status, originalStatus)
177	if structs.ShouldDrainNode(args.Node.Status) || transitionToReady {
178		evalIDs, evalIndex, err := n.createNodeEvals(args.Node.ID, index)
179		if err != nil {
180			n.logger.Error("eval creation failed", "error", err)
181			return err
182		}
183		reply.EvalIDs = evalIDs
184		reply.EvalCreateIndex = evalIndex
185	}
186
187	// Check if we need to setup a heartbeat
188	if !args.Node.TerminalStatus() {
189		ttl, err := n.srv.resetHeartbeatTimer(args.Node.ID)
190		if err != nil {
191			n.logger.Error("heartbeat reset failed", "error", err)
192			return err
193		}
194		reply.HeartbeatTTL = ttl
195	}
196
197	// Set the reply index
198	reply.Index = index
199	snap, err = n.srv.fsm.State().Snapshot()
200	if err != nil {
201		return err
202	}
203
204	n.srv.peerLock.RLock()
205	defer n.srv.peerLock.RUnlock()
206	if err := n.constructNodeServerInfoResponse(snap, reply); err != nil {
207		n.logger.Error("failed to populate NodeUpdateResponse", "error", err)
208		return err
209	}
210
211	return nil
212}
213
214// updateNodeUpdateResponse assumes the n.srv.peerLock is held for reading.
215func (n *Node) constructNodeServerInfoResponse(snap *state.StateSnapshot, reply *structs.NodeUpdateResponse) error {
216	reply.LeaderRPCAddr = string(n.srv.raft.Leader())
217
218	// Reply with config information required for future RPC requests
219	reply.Servers = make([]*structs.NodeServerInfo, 0, len(n.srv.localPeers))
220	for _, v := range n.srv.localPeers {
221		reply.Servers = append(reply.Servers,
222			&structs.NodeServerInfo{
223				RPCAdvertiseAddr: v.RPCAddr.String(),
224				RPCMajorVersion:  int32(v.MajorVersion),
225				RPCMinorVersion:  int32(v.MinorVersion),
226				Datacenter:       v.Datacenter,
227			})
228	}
229
230	// TODO(sean@): Use an indexed node count instead
231	//
232	// Snapshot is used only to iterate over all nodes to create a node
233	// count to send back to Nomad Clients in their heartbeat so Clients
234	// can estimate the size of the cluster.
235	ws := memdb.NewWatchSet()
236	iter, err := snap.Nodes(ws)
237	if err == nil {
238		for {
239			raw := iter.Next()
240			if raw == nil {
241				break
242			}
243			reply.NumNodes++
244		}
245	}
246
247	reply.Features = n.srv.EnterpriseState.Features()
248
249	return nil
250}
251
252// Deregister is used to remove a client from the cluster. If a client should
253// just be made unavailable for scheduling, a status update is preferred.
254func (n *Node) Deregister(args *structs.NodeDeregisterRequest, reply *structs.NodeUpdateResponse) error {
255	if done, err := n.srv.forward("Node.Deregister", args, args, reply); done {
256		return err
257	}
258	defer metrics.MeasureSince([]string{"nomad", "client", "deregister"}, time.Now())
259
260	if args.NodeID == "" {
261		return fmt.Errorf("missing node ID for client deregistration")
262	}
263
264	// deregister takes a batch
265	repack := &structs.NodeBatchDeregisterRequest{
266		NodeIDs:      []string{args.NodeID},
267		WriteRequest: args.WriteRequest,
268	}
269
270	return n.deregister(repack, reply, func() (interface{}, uint64, error) {
271		return n.srv.raftApply(structs.NodeDeregisterRequestType, args)
272	})
273}
274
275// BatchDeregister is used to remove client nodes from the cluster.
276func (n *Node) BatchDeregister(args *structs.NodeBatchDeregisterRequest, reply *structs.NodeUpdateResponse) error {
277	if done, err := n.srv.forward("Node.BatchDeregister", args, args, reply); done {
278		return err
279	}
280	defer metrics.MeasureSince([]string{"nomad", "client", "batch_deregister"}, time.Now())
281
282	if len(args.NodeIDs) == 0 {
283		return fmt.Errorf("missing node IDs for client deregistration")
284	}
285
286	return n.deregister(args, reply, func() (interface{}, uint64, error) {
287		return n.srv.raftApply(structs.NodeBatchDeregisterRequestType, args)
288	})
289}
290
291// deregister takes a raftMessage closure, to support both Deregister and BatchDeregister
292func (n *Node) deregister(args *structs.NodeBatchDeregisterRequest,
293	reply *structs.NodeUpdateResponse,
294	raftApplyFn func() (interface{}, uint64, error),
295) error {
296	// Check request permissions
297	if aclObj, err := n.srv.ResolveToken(args.AuthToken); err != nil {
298		return err
299	} else if aclObj != nil && !aclObj.AllowNodeWrite() {
300		return structs.ErrPermissionDenied
301	}
302
303	// Look for the node
304	snap, err := n.srv.fsm.State().Snapshot()
305	if err != nil {
306		return err
307	}
308
309	ws := memdb.NewWatchSet()
310	for _, nodeID := range args.NodeIDs {
311		node, err := snap.NodeByID(ws, nodeID)
312		if err != nil {
313			return err
314		}
315		if node == nil {
316			return fmt.Errorf("node not found")
317		}
318	}
319
320	// Commit this update via Raft
321	_, index, err := raftApplyFn()
322	if err != nil {
323		n.logger.Error("raft message failed", "error", err)
324		return err
325	}
326
327	for _, nodeID := range args.NodeIDs {
328		// Clear the heartbeat timer if any
329		n.srv.clearHeartbeatTimer(nodeID)
330
331		// Create the evaluations for this node
332		evalIDs, evalIndex, err := n.createNodeEvals(nodeID, index)
333		if err != nil {
334			n.logger.Error("eval creation failed", "error", err)
335			return err
336		}
337
338		// Determine if there are any Vault accessors on the node
339		if accessors, err := snap.VaultAccessorsByNode(ws, nodeID); err != nil {
340			n.logger.Error("looking up vault accessors for node failed", "node_id", nodeID, "error", err)
341			return err
342		} else if l := len(accessors); l > 0 {
343			n.logger.Debug("revoking vault accessors on node due to deregister", "num_accessors", l, "node_id", nodeID)
344			if err := n.srv.vault.RevokeTokens(context.Background(), accessors, true); err != nil {
345				n.logger.Error("revoking vault accessors for node failed", "node_id", nodeID, "error", err)
346				return err
347			}
348		}
349
350		// Determine if there are any SI token accessors on the node
351		if accessors, err := snap.SITokenAccessorsByNode(ws, nodeID); err != nil {
352			n.logger.Error("looking up si accessors for node failed", "node_id", nodeID, "error", err)
353			return err
354		} else if l := len(accessors); l > 0 {
355			n.logger.Debug("revoking si accessors on node due to deregister", "num_accessors", l, "node_id", nodeID)
356			// Unlike with the Vault integration, there's no error returned here, since
357			// bootstrapping the Consul client is elsewhere. Errors in revocation trigger
358			// background retry attempts rather than inline error handling.
359			_ = n.srv.consulACLs.RevokeTokens(context.Background(), accessors, true)
360		}
361
362		reply.EvalIDs = append(reply.EvalIDs, evalIDs...)
363		// Set the reply eval create index just the first time
364		if reply.EvalCreateIndex == 0 {
365			reply.EvalCreateIndex = evalIndex
366		}
367	}
368
369	reply.NodeModifyIndex = index
370	reply.Index = index
371	return nil
372}
373
374// UpdateStatus is used to update the status of a client node
375func (n *Node) UpdateStatus(args *structs.NodeUpdateStatusRequest, reply *structs.NodeUpdateResponse) error {
376	isForwarded := args.IsForwarded()
377	if done, err := n.srv.forward("Node.UpdateStatus", args, args, reply); done {
378		// We have a valid node connection since there is no error from the
379		// forwarded server, so add the mapping to cache the
380		// connection and allow the server to send RPCs to the client.
381		if err == nil && n.ctx != nil && n.ctx.NodeID == "" && !isForwarded {
382			n.ctx.NodeID = args.NodeID
383			n.srv.addNodeConn(n.ctx)
384		}
385
386		return err
387	}
388	defer metrics.MeasureSince([]string{"nomad", "client", "update_status"}, time.Now())
389
390	// Verify the arguments
391	if args.NodeID == "" {
392		return fmt.Errorf("missing node ID for client status update")
393	}
394	if !structs.ValidNodeStatus(args.Status) {
395		return fmt.Errorf("invalid status for node")
396	}
397
398	// Look for the node
399	snap, err := n.srv.fsm.State().Snapshot()
400	if err != nil {
401		return err
402	}
403
404	ws := memdb.NewWatchSet()
405	node, err := snap.NodeByID(ws, args.NodeID)
406	if err != nil {
407		return err
408	}
409	if node == nil {
410		return fmt.Errorf("node not found")
411	}
412
413	// We have a valid node connection, so add the mapping to cache the
414	// connection and allow the server to send RPCs to the client. We only cache
415	// the connection if it is not being forwarded from another server.
416	if n.ctx != nil && n.ctx.NodeID == "" && !args.IsForwarded() {
417		n.ctx.NodeID = args.NodeID
418		n.srv.addNodeConn(n.ctx)
419	}
420
421	// XXX: Could use the SecretID here but have to update the heartbeat system
422	// to track SecretIDs.
423
424	// Update the timestamp of when the node status was updated
425	args.UpdatedAt = time.Now().Unix()
426
427	// Commit this update via Raft
428	var index uint64
429	if node.Status != args.Status {
430		// Attach an event if we are updating the node status to ready when it
431		// is down via a heartbeat
432		if node.Status == structs.NodeStatusDown && args.NodeEvent == nil {
433			args.NodeEvent = structs.NewNodeEvent().
434				SetSubsystem(structs.NodeEventSubsystemCluster).
435				SetMessage(NodeHeartbeatEventReregistered)
436		}
437
438		_, index, err = n.srv.raftApply(structs.NodeUpdateStatusRequestType, args)
439		if err != nil {
440			n.logger.Error("status update failed", "error", err)
441			return err
442		}
443		reply.NodeModifyIndex = index
444	}
445
446	// Check if we should trigger evaluations
447	transitionToReady := transitionedToReady(args.Status, node.Status)
448	if structs.ShouldDrainNode(args.Status) || transitionToReady {
449		evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index)
450		if err != nil {
451			n.logger.Error("eval creation failed", "error", err)
452			return err
453		}
454		reply.EvalIDs = evalIDs
455		reply.EvalCreateIndex = evalIndex
456	}
457
458	// Check if we need to setup a heartbeat
459	switch args.Status {
460	case structs.NodeStatusDown:
461		// Determine if there are any Vault accessors on the node to cleanup
462		if accessors, err := n.srv.State().VaultAccessorsByNode(ws, args.NodeID); err != nil {
463			n.logger.Error("looking up vault accessors for node failed", "node_id", args.NodeID, "error", err)
464			return err
465		} else if l := len(accessors); l > 0 {
466			n.logger.Debug("revoking vault accessors on node due to down state", "num_accessors", l, "node_id", args.NodeID)
467			if err := n.srv.vault.RevokeTokens(context.Background(), accessors, true); err != nil {
468				n.logger.Error("revoking vault accessors for node failed", "node_id", args.NodeID, "error", err)
469				return err
470			}
471		}
472
473		// Determine if there are any SI token accessors on the node to cleanup
474		if accessors, err := n.srv.State().SITokenAccessorsByNode(ws, args.NodeID); err != nil {
475			n.logger.Error("looking up SI accessors for node failed", "node_id", args.NodeID, "error", err)
476			return err
477		} else if l := len(accessors); l > 0 {
478			n.logger.Debug("revoking SI accessors on node due to down state", "num_accessors", l, "node_id", args.NodeID)
479			_ = n.srv.consulACLs.RevokeTokens(context.Background(), accessors, true)
480		}
481	default:
482		ttl, err := n.srv.resetHeartbeatTimer(args.NodeID)
483		if err != nil {
484			n.logger.Error("heartbeat reset failed", "error", err)
485			return err
486		}
487		reply.HeartbeatTTL = ttl
488	}
489
490	// Set the reply index and leader
491	reply.Index = index
492	n.srv.peerLock.RLock()
493	defer n.srv.peerLock.RUnlock()
494	if err := n.constructNodeServerInfoResponse(snap, reply); err != nil {
495		n.logger.Error("failed to populate NodeUpdateResponse", "error", err)
496		return err
497	}
498
499	return nil
500}
501
502// transitionedToReady is a helper that takes a nodes new and old status and
503// returns whether it has transitioned to ready.
504func transitionedToReady(newStatus, oldStatus string) bool {
505	initToReady := oldStatus == structs.NodeStatusInit && newStatus == structs.NodeStatusReady
506	terminalToReady := oldStatus == structs.NodeStatusDown && newStatus == structs.NodeStatusReady
507	return initToReady || terminalToReady
508}
509
510// UpdateDrain is used to update the drain mode of a client node
511func (n *Node) UpdateDrain(args *structs.NodeUpdateDrainRequest,
512	reply *structs.NodeDrainUpdateResponse) error {
513	if done, err := n.srv.forward("Node.UpdateDrain", args, args, reply); done {
514		return err
515	}
516	defer metrics.MeasureSince([]string{"nomad", "client", "update_drain"}, time.Now())
517
518	// Check node write permissions
519	if aclObj, err := n.srv.ResolveToken(args.AuthToken); err != nil {
520		return err
521	} else if aclObj != nil && !aclObj.AllowNodeWrite() {
522		return structs.ErrPermissionDenied
523	}
524
525	// Verify the arguments
526	if args.NodeID == "" {
527		return fmt.Errorf("missing node ID for drain update")
528	}
529	if args.NodeEvent != nil {
530		return fmt.Errorf("node event must not be set")
531	}
532
533	// Look for the node
534	snap, err := n.srv.fsm.State().Snapshot()
535	if err != nil {
536		return err
537	}
538	node, err := snap.NodeByID(nil, args.NodeID)
539	if err != nil {
540		return err
541	}
542	if node == nil {
543		return fmt.Errorf("node not found")
544	}
545
546	now := time.Now().UTC()
547
548	// Update the timestamp of when the node status was updated
549	args.UpdatedAt = now.Unix()
550
551	// Setup drain strategy
552	if args.DrainStrategy != nil {
553		// Mark start time for the drain
554		if node.DrainStrategy == nil {
555			args.DrainStrategy.StartedAt = now
556		} else {
557			args.DrainStrategy.StartedAt = node.DrainStrategy.StartedAt
558		}
559
560		// Mark the deadline time
561		if args.DrainStrategy.Deadline.Nanoseconds() > 0 {
562			args.DrainStrategy.ForceDeadline = now.Add(args.DrainStrategy.Deadline)
563		}
564	}
565
566	// Construct the node event
567	args.NodeEvent = structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemDrain)
568	if node.DrainStrategy == nil && args.DrainStrategy != nil {
569		args.NodeEvent.SetMessage(NodeDrainEventDrainSet)
570	} else if node.DrainStrategy != nil && args.DrainStrategy != nil {
571		args.NodeEvent.SetMessage(NodeDrainEventDrainUpdated)
572	} else if node.DrainStrategy != nil && args.DrainStrategy == nil {
573		args.NodeEvent.SetMessage(NodeDrainEventDrainDisabled)
574	} else {
575		args.NodeEvent = nil
576	}
577
578	// Commit this update via Raft
579	_, index, err := n.srv.raftApply(structs.NodeUpdateDrainRequestType, args)
580	if err != nil {
581		n.logger.Error("drain update failed", "error", err)
582		return err
583	}
584	reply.NodeModifyIndex = index
585
586	// If the node is transitioning to be eligible, create Node evaluations
587	// because there may be a System job registered that should be evaluated.
588	if node.SchedulingEligibility == structs.NodeSchedulingIneligible && args.MarkEligible && args.DrainStrategy == nil {
589		evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index)
590		if err != nil {
591			n.logger.Error("eval creation failed", "error", err)
592			return err
593		}
594		reply.EvalIDs = evalIDs
595		reply.EvalCreateIndex = evalIndex
596	}
597
598	// Set the reply index
599	reply.Index = index
600	return nil
601}
602
603// UpdateEligibility is used to update the scheduling eligibility of a node
604func (n *Node) UpdateEligibility(args *structs.NodeUpdateEligibilityRequest,
605	reply *structs.NodeEligibilityUpdateResponse) error {
606	if done, err := n.srv.forward("Node.UpdateEligibility", args, args, reply); done {
607		return err
608	}
609	defer metrics.MeasureSince([]string{"nomad", "client", "update_eligibility"}, time.Now())
610
611	// Check node write permissions
612	if aclObj, err := n.srv.ResolveToken(args.AuthToken); err != nil {
613		return err
614	} else if aclObj != nil && !aclObj.AllowNodeWrite() {
615		return structs.ErrPermissionDenied
616	}
617
618	// Verify the arguments
619	if args.NodeID == "" {
620		return fmt.Errorf("missing node ID for setting scheduling eligibility")
621	}
622	if args.NodeEvent != nil {
623		return fmt.Errorf("node event must not be set")
624	}
625
626	// Check that only allowed types are set
627	switch args.Eligibility {
628	case structs.NodeSchedulingEligible, structs.NodeSchedulingIneligible:
629	default:
630		return fmt.Errorf("invalid scheduling eligibility %q", args.Eligibility)
631	}
632
633	// Look for the node
634	snap, err := n.srv.fsm.State().Snapshot()
635	if err != nil {
636		return err
637	}
638	node, err := snap.NodeByID(nil, args.NodeID)
639	if err != nil {
640		return err
641	}
642	if node == nil {
643		return fmt.Errorf("node not found")
644	}
645
646	if node.DrainStrategy != nil && args.Eligibility == structs.NodeSchedulingEligible {
647		return fmt.Errorf("can not set node's scheduling eligibility to eligible while it is draining")
648	}
649
650	switch args.Eligibility {
651	case structs.NodeSchedulingEligible, structs.NodeSchedulingIneligible:
652	default:
653		return fmt.Errorf("invalid scheduling eligibility %q", args.Eligibility)
654	}
655
656	// Update the timestamp of when the node status was updated
657	args.UpdatedAt = time.Now().Unix()
658
659	// Construct the node event
660	args.NodeEvent = structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemCluster)
661	if node.SchedulingEligibility == args.Eligibility {
662		return nil // Nothing to do
663	} else if args.Eligibility == structs.NodeSchedulingEligible {
664		args.NodeEvent.SetMessage(NodeEligibilityEventEligible)
665	} else {
666		args.NodeEvent.SetMessage(NodeEligibilityEventIneligible)
667	}
668
669	// Commit this update via Raft
670	outErr, index, err := n.srv.raftApply(structs.NodeUpdateEligibilityRequestType, args)
671	if err != nil {
672		n.logger.Error("eligibility update failed", "error", err)
673		return err
674	}
675	if outErr != nil {
676		if err, ok := outErr.(error); ok && err != nil {
677			n.logger.Error("eligibility update failed", "error", err)
678			return err
679		}
680	}
681
682	// If the node is transitioning to be eligible, create Node evaluations
683	// because there may be a System job registered that should be evaluated.
684	if node.SchedulingEligibility == structs.NodeSchedulingIneligible && args.Eligibility == structs.NodeSchedulingEligible {
685		evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, index)
686		if err != nil {
687			n.logger.Error("eval creation failed", "error", err)
688			return err
689		}
690		reply.EvalIDs = evalIDs
691		reply.EvalCreateIndex = evalIndex
692	}
693
694	// Set the reply index
695	reply.Index = index
696	return nil
697}
698
699// Evaluate is used to force a re-evaluation of the node
700func (n *Node) Evaluate(args *structs.NodeEvaluateRequest, reply *structs.NodeUpdateResponse) error {
701	if done, err := n.srv.forward("Node.Evaluate", args, args, reply); done {
702		return err
703	}
704	defer metrics.MeasureSince([]string{"nomad", "client", "evaluate"}, time.Now())
705
706	// Check node write permissions
707	if aclObj, err := n.srv.ResolveToken(args.AuthToken); err != nil {
708		return err
709	} else if aclObj != nil && !aclObj.AllowNodeWrite() {
710		return structs.ErrPermissionDenied
711	}
712
713	// Verify the arguments
714	if args.NodeID == "" {
715		return fmt.Errorf("missing node ID for evaluation")
716	}
717
718	// Look for the node
719	snap, err := n.srv.fsm.State().Snapshot()
720	if err != nil {
721		return err
722	}
723	ws := memdb.NewWatchSet()
724	node, err := snap.NodeByID(ws, args.NodeID)
725	if err != nil {
726		return err
727	}
728	if node == nil {
729		return fmt.Errorf("node not found")
730	}
731
732	// Create the evaluation
733	evalIDs, evalIndex, err := n.createNodeEvals(args.NodeID, node.ModifyIndex)
734	if err != nil {
735		n.logger.Error("eval creation failed", "error", err)
736		return err
737	}
738	reply.EvalIDs = evalIDs
739	reply.EvalCreateIndex = evalIndex
740
741	// Set the reply index
742	reply.Index = evalIndex
743
744	n.srv.peerLock.RLock()
745	defer n.srv.peerLock.RUnlock()
746	if err := n.constructNodeServerInfoResponse(snap, reply); err != nil {
747		n.logger.Error("failed to populate NodeUpdateResponse", "error", err)
748		return err
749	}
750	return nil
751}
752
753// GetNode is used to request information about a specific node
754func (n *Node) GetNode(args *structs.NodeSpecificRequest,
755	reply *structs.SingleNodeResponse) error {
756	if done, err := n.srv.forward("Node.GetNode", args, args, reply); done {
757		return err
758	}
759	defer metrics.MeasureSince([]string{"nomad", "client", "get_node"}, time.Now())
760
761	// Check node read permissions
762	if aclObj, err := n.srv.ResolveToken(args.AuthToken); err != nil {
763		// If ResolveToken had an unexpected error return that
764		if err != structs.ErrTokenNotFound {
765			return err
766		}
767
768		// Attempt to lookup AuthToken as a Node.SecretID since nodes
769		// call this endpoint and don't have an ACL token.
770		node, stateErr := n.srv.fsm.State().NodeBySecretID(nil, args.AuthToken)
771		if stateErr != nil {
772			// Return the original ResolveToken error with this err
773			var merr multierror.Error
774			merr.Errors = append(merr.Errors, err, stateErr)
775			return merr.ErrorOrNil()
776		}
777
778		// Not a node or a valid ACL token
779		if node == nil {
780			return structs.ErrTokenNotFound
781		}
782	} else if aclObj != nil && !aclObj.AllowNodeRead() {
783		return structs.ErrPermissionDenied
784	}
785
786	// Setup the blocking query
787	opts := blockingOptions{
788		queryOpts: &args.QueryOptions,
789		queryMeta: &reply.QueryMeta,
790		run: func(ws memdb.WatchSet, state *state.StateStore) error {
791			// Verify the arguments
792			if args.NodeID == "" {
793				return fmt.Errorf("missing node ID")
794			}
795
796			// Look for the node
797			out, err := state.NodeByID(ws, args.NodeID)
798			if err != nil {
799				return err
800			}
801
802			// Setup the output
803			if out != nil {
804				out = out.Sanitize()
805				reply.Node = out
806				reply.Index = out.ModifyIndex
807			} else {
808				// Use the last index that affected the nodes table
809				index, err := state.Index("nodes")
810				if err != nil {
811					return err
812				}
813				reply.Node = nil
814				reply.Index = index
815			}
816
817			// Set the query response
818			n.srv.setQueryMeta(&reply.QueryMeta)
819			return nil
820		}}
821	return n.srv.blockingRPC(&opts)
822}
823
824// GetAllocs is used to request allocations for a specific node
825func (n *Node) GetAllocs(args *structs.NodeSpecificRequest,
826	reply *structs.NodeAllocsResponse) error {
827	if done, err := n.srv.forward("Node.GetAllocs", args, args, reply); done {
828		return err
829	}
830	defer metrics.MeasureSince([]string{"nomad", "client", "get_allocs"}, time.Now())
831
832	// Check node read and namespace job read permissions
833	aclObj, err := n.srv.ResolveToken(args.AuthToken)
834	if err != nil {
835		return err
836	}
837	if aclObj != nil && !aclObj.AllowNodeRead() {
838		return structs.ErrPermissionDenied
839	}
840
841	// cache namespace perms
842	readableNamespaces := map[string]bool{}
843
844	// readNS is a caching namespace read-job helper
845	readNS := func(ns string) bool {
846		if aclObj == nil {
847			// ACLs are disabled; everything is readable
848			return true
849		}
850
851		if readable, ok := readableNamespaces[ns]; ok {
852			// cache hit
853			return readable
854		}
855
856		// cache miss
857		readable := aclObj.AllowNsOp(ns, acl.NamespaceCapabilityReadJob)
858		readableNamespaces[ns] = readable
859		return readable
860	}
861
862	// Verify the arguments
863	if args.NodeID == "" {
864		return fmt.Errorf("missing node ID")
865	}
866
867	// Setup the blocking query
868	opts := blockingOptions{
869		queryOpts: &args.QueryOptions,
870		queryMeta: &reply.QueryMeta,
871		run: func(ws memdb.WatchSet, state *state.StateStore) error {
872			// Look for the node
873			allocs, err := state.AllocsByNode(ws, args.NodeID)
874			if err != nil {
875				return err
876			}
877
878			// Setup the output
879			if n := len(allocs); n != 0 {
880				reply.Allocs = make([]*structs.Allocation, 0, n)
881				for _, alloc := range allocs {
882					if readNS(alloc.Namespace) {
883						reply.Allocs = append(reply.Allocs, alloc)
884					}
885
886					// Get the max of all allocs since
887					// subsequent requests need to start
888					// from the latest index
889					reply.Index = maxUint64(reply.Index, alloc.ModifyIndex)
890				}
891			} else {
892				reply.Allocs = nil
893
894				// Use the last index that affected the nodes table
895				index, err := state.Index("allocs")
896				if err != nil {
897					return err
898				}
899
900				// Must provide non-zero index to prevent blocking
901				// Index 1 is impossible anyways (due to Raft internals)
902				if index == 0 {
903					reply.Index = 1
904				} else {
905					reply.Index = index
906				}
907			}
908			return nil
909		}}
910	return n.srv.blockingRPC(&opts)
911}
912
913// GetClientAllocs is used to request a lightweight list of alloc modify indexes
914// per allocation.
915func (n *Node) GetClientAllocs(args *structs.NodeSpecificRequest,
916	reply *structs.NodeClientAllocsResponse) error {
917	isForwarded := args.IsForwarded()
918	if done, err := n.srv.forward("Node.GetClientAllocs", args, args, reply); done {
919		// We have a valid node connection since there is no error from the
920		// forwarded server, so add the mapping to cache the
921		// connection and allow the server to send RPCs to the client.
922		if err == nil && n.ctx != nil && n.ctx.NodeID == "" && !isForwarded {
923			n.ctx.NodeID = args.NodeID
924			n.srv.addNodeConn(n.ctx)
925		}
926
927		return err
928	}
929	defer metrics.MeasureSince([]string{"nomad", "client", "get_client_allocs"}, time.Now())
930
931	// Verify the arguments
932	if args.NodeID == "" {
933		return fmt.Errorf("missing node ID")
934	}
935
936	// numOldAllocs is used to detect if there is a garbage collection event
937	// that effects the node. When an allocation is garbage collected, that does
938	// not change the modify index changes and thus the query won't unblock,
939	// even though the set of allocations on the node has changed.
940	var numOldAllocs int
941
942	// Setup the blocking query
943	opts := blockingOptions{
944		queryOpts: &args.QueryOptions,
945		queryMeta: &reply.QueryMeta,
946		run: func(ws memdb.WatchSet, state *state.StateStore) error {
947			// Look for the node
948			node, err := state.NodeByID(ws, args.NodeID)
949			if err != nil {
950				return err
951			}
952
953			var allocs []*structs.Allocation
954			if node != nil {
955				if args.SecretID == "" {
956					return fmt.Errorf("missing node secret ID for client status update")
957				} else if args.SecretID != node.SecretID {
958					return fmt.Errorf("node secret ID does not match")
959				}
960
961				// We have a valid node connection, so add the mapping to cache the
962				// connection and allow the server to send RPCs to the client. We only cache
963				// the connection if it is not being forwarded from another server.
964				if n.ctx != nil && n.ctx.NodeID == "" && !args.IsForwarded() {
965					n.ctx.NodeID = args.NodeID
966					n.srv.addNodeConn(n.ctx)
967				}
968
969				var err error
970				allocs, err = state.AllocsByNode(ws, args.NodeID)
971				if err != nil {
972					return err
973				}
974			}
975
976			reply.Allocs = make(map[string]uint64)
977			reply.MigrateTokens = make(map[string]string)
978
979			// preferTableIndex is used to determine whether we should build the
980			// response index based on the full table indexes versus the modify
981			// indexes of the allocations on the specific node. This is
982			// preferred in the case that the node doesn't yet have allocations
983			// or when we detect a GC that effects the node.
984			preferTableIndex := true
985
986			// Setup the output
987			if numAllocs := len(allocs); numAllocs != 0 {
988				preferTableIndex = false
989
990				for _, alloc := range allocs {
991					reply.Allocs[alloc.ID] = alloc.AllocModifyIndex
992
993					// If the allocation is going to do a migration, create a
994					// migration token so that the client can authenticate with
995					// the node hosting the previous allocation.
996					if alloc.ShouldMigrate() {
997						prevAllocation, err := state.AllocByID(ws, alloc.PreviousAllocation)
998						if err != nil {
999							return err
1000						}
1001
1002						if prevAllocation != nil && prevAllocation.NodeID != alloc.NodeID {
1003							allocNode, err := state.NodeByID(ws, prevAllocation.NodeID)
1004							if err != nil {
1005								return err
1006							}
1007							if allocNode == nil {
1008								// Node must have been GC'd so skip the token
1009								continue
1010							}
1011
1012							token, err := structs.GenerateMigrateToken(prevAllocation.ID, allocNode.SecretID)
1013							if err != nil {
1014								return err
1015							}
1016							reply.MigrateTokens[alloc.ID] = token
1017						}
1018					}
1019
1020					reply.Index = maxUint64(reply.Index, alloc.ModifyIndex)
1021				}
1022
1023				// Determine if we have less allocations than before. This
1024				// indicates there was a garbage collection
1025				if numAllocs < numOldAllocs {
1026					preferTableIndex = true
1027				}
1028
1029				// Store the new number of allocations
1030				numOldAllocs = numAllocs
1031			}
1032
1033			if preferTableIndex {
1034				// Use the last index that affected the nodes table
1035				index, err := state.Index("allocs")
1036				if err != nil {
1037					return err
1038				}
1039
1040				// Must provide non-zero index to prevent blocking
1041				// Index 1 is impossible anyways (due to Raft internals)
1042				if index == 0 {
1043					reply.Index = 1
1044				} else {
1045					reply.Index = index
1046				}
1047			}
1048			return nil
1049		}}
1050	return n.srv.blockingRPC(&opts)
1051}
1052
1053// UpdateAlloc is used to update the client status of an allocation
1054func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.GenericResponse) error {
1055	if done, err := n.srv.forward("Node.UpdateAlloc", args, args, reply); done {
1056		return err
1057	}
1058	defer metrics.MeasureSince([]string{"nomad", "client", "update_alloc"}, time.Now())
1059
1060	// Ensure at least a single alloc
1061	if len(args.Alloc) == 0 {
1062		return fmt.Errorf("must update at least one allocation")
1063	}
1064
1065	// Ensure that evals aren't set from client RPCs
1066	// We create them here before the raft update
1067	if len(args.Evals) != 0 {
1068		return fmt.Errorf("evals field must not be set")
1069	}
1070
1071	// Update modified timestamp for client initiated allocation updates
1072	now := time.Now()
1073	var evals []*structs.Evaluation
1074
1075	for _, allocToUpdate := range args.Alloc {
1076		allocToUpdate.ModifyTime = now.UTC().UnixNano()
1077
1078		if !allocToUpdate.TerminalStatus() {
1079			continue
1080		}
1081
1082		alloc, _ := n.srv.State().AllocByID(nil, allocToUpdate.ID)
1083		if alloc == nil {
1084			continue
1085		}
1086
1087		// if the job has been purged, this will always return error
1088		job, err := n.srv.State().JobByID(nil, alloc.Namespace, alloc.JobID)
1089		if err != nil {
1090			n.logger.Debug("UpdateAlloc unable to find job", "job", alloc.JobID, "error", err)
1091			continue
1092		}
1093		if job == nil {
1094			n.logger.Debug("UpdateAlloc unable to find job", "job", alloc.JobID)
1095			continue
1096		}
1097
1098		taskGroup := job.LookupTaskGroup(alloc.TaskGroup)
1099		if taskGroup == nil {
1100			continue
1101		}
1102
1103		// Add an evaluation if this is a failed alloc that is eligible for rescheduling
1104		if allocToUpdate.ClientStatus == structs.AllocClientStatusFailed && alloc.FollowupEvalID == "" && alloc.RescheduleEligible(taskGroup.ReschedulePolicy, now) {
1105			eval := &structs.Evaluation{
1106				ID:          uuid.Generate(),
1107				Namespace:   alloc.Namespace,
1108				TriggeredBy: structs.EvalTriggerRetryFailedAlloc,
1109				JobID:       alloc.JobID,
1110				Type:        job.Type,
1111				Priority:    job.Priority,
1112				Status:      structs.EvalStatusPending,
1113				CreateTime:  now.UTC().UnixNano(),
1114				ModifyTime:  now.UTC().UnixNano(),
1115			}
1116			evals = append(evals, eval)
1117		}
1118	}
1119
1120	// Add this to the batch
1121	n.updatesLock.Lock()
1122	n.updates = append(n.updates, args.Alloc...)
1123	n.evals = append(n.evals, evals...)
1124
1125	// Start a new batch if none
1126	future := n.updateFuture
1127	if future == nil {
1128		future = structs.NewBatchFuture()
1129		n.updateFuture = future
1130		n.updateTimer = time.AfterFunc(batchUpdateInterval, func() {
1131			// Get the pending updates
1132			n.updatesLock.Lock()
1133			updates := n.updates
1134			evals := n.evals
1135			future := n.updateFuture
1136
1137			// Assume future update patterns will be similar to
1138			// current batch and set cap appropriately to avoid
1139			// slice resizing.
1140			n.updates = make([]*structs.Allocation, 0, len(updates))
1141			n.evals = make([]*structs.Evaluation, 0, len(evals))
1142
1143			n.updateFuture = nil
1144			n.updateTimer = nil
1145			n.updatesLock.Unlock()
1146
1147			// Perform the batch update
1148			n.batchUpdate(future, updates, evals)
1149		})
1150	}
1151	n.updatesLock.Unlock()
1152
1153	// Wait for the future
1154	if err := future.Wait(); err != nil {
1155		return err
1156	}
1157
1158	// Setup the response
1159	reply.Index = future.Index()
1160	return nil
1161}
1162
1163// batchUpdate is used to update all the allocations
1164func (n *Node) batchUpdate(future *structs.BatchFuture, updates []*structs.Allocation, evals []*structs.Evaluation) {
1165	// Group pending evals by jobID to prevent creating unnecessary evals
1166	evalsByJobId := make(map[structs.NamespacedID]struct{})
1167	var trimmedEvals []*structs.Evaluation
1168	for _, eval := range evals {
1169		namespacedID := structs.NamespacedID{
1170			ID:        eval.JobID,
1171			Namespace: eval.Namespace,
1172		}
1173		_, exists := evalsByJobId[namespacedID]
1174		if !exists {
1175			now := time.Now().UTC().UnixNano()
1176			eval.CreateTime = now
1177			eval.ModifyTime = now
1178			trimmedEvals = append(trimmedEvals, eval)
1179			evalsByJobId[namespacedID] = struct{}{}
1180		}
1181	}
1182
1183	if len(trimmedEvals) > 0 {
1184		n.logger.Debug("adding evaluations for rescheduling failed allocations", "num_evals", len(trimmedEvals))
1185	}
1186	// Prepare the batch update
1187	batch := &structs.AllocUpdateRequest{
1188		Alloc:        updates,
1189		Evals:        trimmedEvals,
1190		WriteRequest: structs.WriteRequest{Region: n.srv.config.Region},
1191	}
1192
1193	// Commit this update via Raft
1194	var mErr multierror.Error
1195	_, index, err := n.srv.raftApply(structs.AllocClientUpdateRequestType, batch)
1196	if err != nil {
1197		n.logger.Error("alloc update failed", "error", err)
1198		mErr.Errors = append(mErr.Errors, err)
1199	}
1200
1201	// For each allocation we are updating, check if we should revoke any
1202	// - Vault token accessors
1203	// - Service Identity token accessors
1204	var (
1205		revokeVault []*structs.VaultAccessor
1206		revokeSI    []*structs.SITokenAccessor
1207	)
1208
1209	for _, alloc := range updates {
1210		// Skip any allocation that isn't dead on the client
1211		if !alloc.Terminated() {
1212			continue
1213		}
1214
1215		ws := memdb.NewWatchSet()
1216
1217		// Determine if there are any orphaned Vault accessors for the allocation
1218		if accessors, err := n.srv.State().VaultAccessorsByAlloc(ws, alloc.ID); err != nil {
1219			n.logger.Error("looking up vault accessors for alloc failed", "alloc_id", alloc.ID, "error", err)
1220			mErr.Errors = append(mErr.Errors, err)
1221		} else {
1222			revokeVault = append(revokeVault, accessors...)
1223		}
1224
1225		// Determine if there are any orphaned SI accessors for the allocation
1226		if accessors, err := n.srv.State().SITokenAccessorsByAlloc(ws, alloc.ID); err != nil {
1227			n.logger.Error("looking up si accessors for alloc failed", "alloc_id", alloc.ID, "error", err)
1228			mErr.Errors = append(mErr.Errors, err)
1229		} else {
1230			revokeSI = append(revokeSI, accessors...)
1231		}
1232	}
1233
1234	// Revoke any orphaned Vault token accessors
1235	if l := len(revokeVault); l > 0 {
1236		n.logger.Debug("revoking vault accessors due to terminal allocations", "num_accessors", l)
1237		if err := n.srv.vault.RevokeTokens(context.Background(), revokeVault, true); err != nil {
1238			n.logger.Error("batched vault accessor revocation failed", "error", err)
1239			mErr.Errors = append(mErr.Errors, err)
1240		}
1241	}
1242
1243	// Revoke any orphaned SI token accessors
1244	if l := len(revokeSI); l > 0 {
1245		n.logger.Debug("revoking si accessors due to terminal allocations", "num_accessors", l)
1246		_ = n.srv.consulACLs.RevokeTokens(context.Background(), revokeSI, true)
1247	}
1248
1249	// Respond to the future
1250	future.Respond(index, mErr.ErrorOrNil())
1251}
1252
1253// List is used to list the available nodes
1254func (n *Node) List(args *structs.NodeListRequest,
1255	reply *structs.NodeListResponse) error {
1256	if done, err := n.srv.forward("Node.List", args, args, reply); done {
1257		return err
1258	}
1259	defer metrics.MeasureSince([]string{"nomad", "client", "list"}, time.Now())
1260
1261	// Check node read permissions
1262	if aclObj, err := n.srv.ResolveToken(args.AuthToken); err != nil {
1263		return err
1264	} else if aclObj != nil && !aclObj.AllowNodeRead() {
1265		return structs.ErrPermissionDenied
1266	}
1267
1268	// Setup the blocking query
1269	opts := blockingOptions{
1270		queryOpts: &args.QueryOptions,
1271		queryMeta: &reply.QueryMeta,
1272		run: func(ws memdb.WatchSet, state *state.StateStore) error {
1273			// Capture all the nodes
1274			var err error
1275			var iter memdb.ResultIterator
1276			if prefix := args.QueryOptions.Prefix; prefix != "" {
1277				iter, err = state.NodesByIDPrefix(ws, prefix)
1278			} else {
1279				iter, err = state.Nodes(ws)
1280			}
1281			if err != nil {
1282				return err
1283			}
1284
1285			var nodes []*structs.NodeListStub
1286			for {
1287				raw := iter.Next()
1288				if raw == nil {
1289					break
1290				}
1291				node := raw.(*structs.Node)
1292				nodes = append(nodes, node.Stub(args.Fields))
1293			}
1294			reply.Nodes = nodes
1295
1296			// Use the last index that affected the jobs table
1297			index, err := state.Index("nodes")
1298			if err != nil {
1299				return err
1300			}
1301			reply.Index = index
1302
1303			// Set the query response
1304			n.srv.setQueryMeta(&reply.QueryMeta)
1305			return nil
1306		}}
1307	return n.srv.blockingRPC(&opts)
1308}
1309
1310// createNodeEvals is used to create evaluations for each alloc on a node.
1311// Each Eval is scoped to a job, so we need to potentially trigger many evals.
1312func (n *Node) createNodeEvals(nodeID string, nodeIndex uint64) ([]string, uint64, error) {
1313	// Snapshot the state
1314	snap, err := n.srv.fsm.State().Snapshot()
1315	if err != nil {
1316		return nil, 0, fmt.Errorf("failed to snapshot state: %v", err)
1317	}
1318
1319	// Find all the allocations for this node
1320	ws := memdb.NewWatchSet()
1321	allocs, err := snap.AllocsByNode(ws, nodeID)
1322	if err != nil {
1323		return nil, 0, fmt.Errorf("failed to find allocs for '%s': %v", nodeID, err)
1324	}
1325
1326	sysJobsIter, err := snap.JobsByScheduler(ws, "system")
1327	if err != nil {
1328		return nil, 0, fmt.Errorf("failed to find system jobs for '%s': %v", nodeID, err)
1329	}
1330
1331	var sysJobs []*structs.Job
1332	for job := sysJobsIter.Next(); job != nil; job = sysJobsIter.Next() {
1333		sysJobs = append(sysJobs, job.(*structs.Job))
1334	}
1335
1336	// Fast-path if nothing to do
1337	if len(allocs) == 0 && len(sysJobs) == 0 {
1338		return nil, 0, nil
1339	}
1340
1341	// Create an eval for each JobID affected
1342	var evals []*structs.Evaluation
1343	var evalIDs []string
1344	jobIDs := make(map[string]struct{})
1345	now := time.Now().UTC().UnixNano()
1346
1347	for _, alloc := range allocs {
1348		// Deduplicate on JobID
1349		if _, ok := jobIDs[alloc.JobID]; ok {
1350			continue
1351		}
1352		jobIDs[alloc.JobID] = struct{}{}
1353
1354		// Create a new eval
1355		eval := &structs.Evaluation{
1356			ID:              uuid.Generate(),
1357			Namespace:       alloc.Namespace,
1358			Priority:        alloc.Job.Priority,
1359			Type:            alloc.Job.Type,
1360			TriggeredBy:     structs.EvalTriggerNodeUpdate,
1361			JobID:           alloc.JobID,
1362			NodeID:          nodeID,
1363			NodeModifyIndex: nodeIndex,
1364			Status:          structs.EvalStatusPending,
1365			CreateTime:      now,
1366			ModifyTime:      now,
1367		}
1368		evals = append(evals, eval)
1369		evalIDs = append(evalIDs, eval.ID)
1370	}
1371
1372	// Create an evaluation for each system job.
1373	for _, job := range sysJobs {
1374		// Still dedup on JobID as the node may already have the system job.
1375		if _, ok := jobIDs[job.ID]; ok {
1376			continue
1377		}
1378		jobIDs[job.ID] = struct{}{}
1379
1380		// Create a new eval
1381		eval := &structs.Evaluation{
1382			ID:              uuid.Generate(),
1383			Namespace:       job.Namespace,
1384			Priority:        job.Priority,
1385			Type:            job.Type,
1386			TriggeredBy:     structs.EvalTriggerNodeUpdate,
1387			JobID:           job.ID,
1388			NodeID:          nodeID,
1389			NodeModifyIndex: nodeIndex,
1390			Status:          structs.EvalStatusPending,
1391			CreateTime:      now,
1392			ModifyTime:      now,
1393		}
1394		evals = append(evals, eval)
1395		evalIDs = append(evalIDs, eval.ID)
1396	}
1397
1398	// Create the Raft transaction
1399	update := &structs.EvalUpdateRequest{
1400		Evals:        evals,
1401		WriteRequest: structs.WriteRequest{Region: n.srv.config.Region},
1402	}
1403
1404	// Commit this evaluation via Raft
1405	// XXX: There is a risk of partial failure where the node update succeeds
1406	// but that the EvalUpdate does not.
1407	_, evalIndex, err := n.srv.raftApply(structs.EvalUpdateRequestType, update)
1408	if err != nil {
1409		return nil, 0, err
1410	}
1411	return evalIDs, evalIndex, nil
1412}
1413
1414// DeriveVaultToken is used by the clients to request wrapped Vault tokens for
1415// tasks
1416func (n *Node) DeriveVaultToken(args *structs.DeriveVaultTokenRequest, reply *structs.DeriveVaultTokenResponse) error {
1417	setError := func(e error, recoverable bool) {
1418		if e != nil {
1419			if re, ok := e.(*structs.RecoverableError); ok {
1420				reply.Error = re // No need to wrap if error is already a RecoverableError
1421			} else {
1422				reply.Error = structs.NewRecoverableError(e, recoverable).(*structs.RecoverableError)
1423			}
1424			n.logger.Error("DeriveVaultToken failed", "recoverable", recoverable, "error", e)
1425		}
1426	}
1427
1428	if done, err := n.srv.forward("Node.DeriveVaultToken", args, args, reply); done {
1429		setError(err, structs.IsRecoverable(err) || err == structs.ErrNoLeader)
1430		return nil
1431	}
1432	defer metrics.MeasureSince([]string{"nomad", "client", "derive_vault_token"}, time.Now())
1433
1434	// Verify the arguments
1435	if args.NodeID == "" {
1436		setError(fmt.Errorf("missing node ID"), false)
1437		return nil
1438	}
1439	if args.SecretID == "" {
1440		setError(fmt.Errorf("missing node SecretID"), false)
1441		return nil
1442	}
1443	if args.AllocID == "" {
1444		setError(fmt.Errorf("missing allocation ID"), false)
1445		return nil
1446	}
1447	if len(args.Tasks) == 0 {
1448		setError(fmt.Errorf("no tasks specified"), false)
1449		return nil
1450	}
1451
1452	// Verify the following:
1453	// * The Node exists and has the correct SecretID
1454	// * The Allocation exists on the specified Node
1455	// * The Allocation contains the given tasks and they each require Vault
1456	//   tokens
1457	snap, err := n.srv.fsm.State().Snapshot()
1458	if err != nil {
1459		setError(err, false)
1460		return nil
1461	}
1462	ws := memdb.NewWatchSet()
1463	node, err := snap.NodeByID(ws, args.NodeID)
1464	if err != nil {
1465		setError(err, false)
1466		return nil
1467	}
1468	if node == nil {
1469		setError(fmt.Errorf("Node %q does not exist", args.NodeID), false)
1470		return nil
1471	}
1472	if node.SecretID != args.SecretID {
1473		setError(fmt.Errorf("SecretID mismatch"), false)
1474		return nil
1475	}
1476
1477	alloc, err := snap.AllocByID(ws, args.AllocID)
1478	if err != nil {
1479		setError(err, false)
1480		return nil
1481	}
1482	if alloc == nil {
1483		setError(fmt.Errorf("Allocation %q does not exist", args.AllocID), false)
1484		return nil
1485	}
1486	if alloc.NodeID != args.NodeID {
1487		setError(fmt.Errorf("Allocation %q not running on Node %q", args.AllocID, args.NodeID), false)
1488		return nil
1489	}
1490	if alloc.TerminalStatus() {
1491		setError(fmt.Errorf("Can't request Vault token for terminal allocation"), false)
1492		return nil
1493	}
1494
1495	// Check the policies
1496	policies := alloc.Job.VaultPolicies()
1497	if policies == nil {
1498		setError(fmt.Errorf("Job doesn't require Vault policies"), false)
1499		return nil
1500	}
1501	tg, ok := policies[alloc.TaskGroup]
1502	if !ok {
1503		setError(fmt.Errorf("Task group does not require Vault policies"), false)
1504		return nil
1505	}
1506
1507	var unneeded []string
1508	for _, task := range args.Tasks {
1509		taskVault := tg[task]
1510		if taskVault == nil || len(taskVault.Policies) == 0 {
1511			unneeded = append(unneeded, task)
1512		}
1513	}
1514
1515	if len(unneeded) != 0 {
1516		e := fmt.Errorf("Requested Vault tokens for tasks without defined Vault policies: %s",
1517			strings.Join(unneeded, ", "))
1518		setError(e, false)
1519		return nil
1520	}
1521
1522	// At this point the request is valid and we should contact Vault for
1523	// tokens.
1524
1525	// Create an error group where we will spin up a fixed set of goroutines to
1526	// handle deriving tokens but where if any fails the whole group is
1527	// canceled.
1528	g, ctx := errgroup.WithContext(context.Background())
1529
1530	// Cap the handlers
1531	handlers := len(args.Tasks)
1532	if handlers > maxParallelRequestsPerDerive {
1533		handlers = maxParallelRequestsPerDerive
1534	}
1535
1536	// Create the Vault Tokens
1537	input := make(chan string, handlers)
1538	results := make(map[string]*vapi.Secret, len(args.Tasks))
1539	for i := 0; i < handlers; i++ {
1540		g.Go(func() error {
1541			for {
1542				select {
1543				case task, ok := <-input:
1544					if !ok {
1545						return nil
1546					}
1547
1548					secret, err := n.srv.vault.CreateToken(ctx, alloc, task)
1549					if err != nil {
1550						return err
1551					}
1552
1553					results[task] = secret
1554				case <-ctx.Done():
1555					return nil
1556				}
1557			}
1558		})
1559	}
1560
1561	// Send the input
1562	go func() {
1563		defer close(input)
1564		for _, task := range args.Tasks {
1565			select {
1566			case <-ctx.Done():
1567				return
1568			case input <- task:
1569			}
1570		}
1571	}()
1572
1573	// Wait for everything to complete or for an error
1574	createErr := g.Wait()
1575
1576	// Retrieve the results
1577	accessors := make([]*structs.VaultAccessor, 0, len(results))
1578	tokens := make(map[string]string, len(results))
1579	for task, secret := range results {
1580		w := secret.WrapInfo
1581		tokens[task] = w.Token
1582		accessor := &structs.VaultAccessor{
1583			Accessor:    w.WrappedAccessor,
1584			Task:        task,
1585			NodeID:      alloc.NodeID,
1586			AllocID:     alloc.ID,
1587			CreationTTL: w.TTL,
1588		}
1589
1590		accessors = append(accessors, accessor)
1591	}
1592
1593	// If there was an error revoke the created tokens
1594	if createErr != nil {
1595		n.logger.Error("Vault token creation for alloc failed", "alloc_id", alloc.ID, "error", createErr)
1596
1597		if revokeErr := n.srv.vault.RevokeTokens(context.Background(), accessors, false); revokeErr != nil {
1598			n.logger.Error("Vault token revocation for alloc failed", "alloc_id", alloc.ID, "error", revokeErr)
1599		}
1600
1601		if rerr, ok := createErr.(*structs.RecoverableError); ok {
1602			reply.Error = rerr
1603		} else {
1604			reply.Error = structs.NewRecoverableError(createErr, false).(*structs.RecoverableError)
1605		}
1606
1607		return nil
1608	}
1609
1610	// Commit to Raft before returning any of the tokens
1611	req := structs.VaultAccessorsRequest{Accessors: accessors}
1612	_, index, err := n.srv.raftApply(structs.VaultAccessorRegisterRequestType, &req)
1613	if err != nil {
1614		n.logger.Error("registering Vault accessors for alloc failed", "alloc_id", alloc.ID, "error", err)
1615
1616		// Determine if we can recover from the error
1617		retry := false
1618		switch err {
1619		case raft.ErrNotLeader, raft.ErrLeadershipLost, raft.ErrRaftShutdown, raft.ErrEnqueueTimeout:
1620			retry = true
1621		}
1622
1623		setError(err, retry)
1624		return nil
1625	}
1626
1627	reply.Index = index
1628	reply.Tasks = tokens
1629	n.srv.setQueryMeta(&reply.QueryMeta)
1630	return nil
1631}
1632
1633type connectTask struct {
1634	TaskKind structs.TaskKind
1635	TaskName string
1636}
1637
1638func (n *Node) DeriveSIToken(args *structs.DeriveSITokenRequest, reply *structs.DeriveSITokenResponse) error {
1639	setError := func(e error, recoverable bool) {
1640		if e != nil {
1641			if re, ok := e.(*structs.RecoverableError); ok {
1642				reply.Error = re // No need to wrap if error is already a RecoverableError
1643			} else {
1644				reply.Error = structs.NewRecoverableError(e, recoverable).(*structs.RecoverableError)
1645			}
1646			n.logger.Error("DeriveSIToken failed", "recoverable", recoverable, "error", e)
1647		}
1648	}
1649
1650	if done, err := n.srv.forward("Node.DeriveSIToken", args, args, reply); done {
1651		setError(err, structs.IsRecoverable(err) || err == structs.ErrNoLeader)
1652		return nil
1653	}
1654	defer metrics.MeasureSince([]string{"nomad", "client", "derive_si_token"}, time.Now())
1655
1656	// Verify the arguments
1657	if err := args.Validate(); err != nil {
1658		setError(err, false)
1659		return nil
1660	}
1661
1662	// Get the ClusterID
1663	clusterID, err := n.srv.ClusterID()
1664	if err != nil {
1665		setError(err, false)
1666		return nil
1667	}
1668
1669	// Verify the following:
1670	// * The Node exists and has the correct SecretID.
1671	// * The Allocation exists on the specified Node.
1672	// * The Allocation contains the given tasks, and each task requires a
1673	//   SI token.
1674
1675	snap, err := n.srv.fsm.State().Snapshot()
1676	if err != nil {
1677		setError(err, false)
1678		return nil
1679	}
1680	node, err := snap.NodeByID(nil, args.NodeID)
1681	if err != nil {
1682		setError(err, false)
1683		return nil
1684	}
1685	if node == nil {
1686		setError(errors.Errorf("Node %q does not exist", args.NodeID), false)
1687		return nil
1688	}
1689	if node.SecretID != args.SecretID {
1690		setError(errors.Errorf("SecretID mismatch"), false)
1691		return nil
1692	}
1693
1694	alloc, err := snap.AllocByID(nil, args.AllocID)
1695	if err != nil {
1696		setError(err, false)
1697		return nil
1698	}
1699	if alloc == nil {
1700		setError(errors.Errorf("Allocation %q does not exist", args.AllocID), false)
1701		return nil
1702	}
1703	if alloc.NodeID != args.NodeID {
1704		setError(errors.Errorf("Allocation %q not running on node %q", args.AllocID, args.NodeID), false)
1705		return nil
1706	}
1707	if alloc.TerminalStatus() {
1708		setError(errors.Errorf("Cannot request SI token for terminal allocation"), false)
1709		return nil
1710	}
1711
1712	// make sure task group contains at least one connect enabled service
1713	tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
1714	if tg == nil {
1715		setError(errors.Errorf("Allocation %q does not contain TaskGroup %q", args.AllocID, alloc.TaskGroup), false)
1716		return nil
1717	}
1718	if !tg.UsesConnect() {
1719		setError(errors.Errorf("TaskGroup %q does not use Connect", tg.Name), false)
1720		return nil
1721	}
1722
1723	// make sure each task in args.Tasks is a connect-enabled task
1724	notConnect, tasks := connectTasks(tg, args.Tasks)
1725	if len(notConnect) > 0 {
1726		setError(fmt.Errorf(
1727			"Requested Consul Service Identity tokens for tasks that are not Connect enabled: %v",
1728			strings.Join(notConnect, ", "),
1729		), false)
1730	}
1731
1732	// At this point the request is valid and we should contact Consul for tokens.
1733
1734	// A lot of the following is copied from DeriveVaultToken which has been
1735	// working fine for years.
1736
1737	// Create an error group where we will spin up a fixed set of goroutines to
1738	// handle deriving tokens but where if any fails the whole group is
1739	// canceled.
1740	g, ctx := errgroup.WithContext(context.Background())
1741
1742	// Cap the worker threads
1743	numWorkers := len(args.Tasks)
1744	if numWorkers > maxParallelRequestsPerDerive {
1745		numWorkers = maxParallelRequestsPerDerive
1746	}
1747
1748	// would like to pull some of this out...
1749
1750	// Create the SI tokens from a slice of task name + connect service
1751	input := make(chan connectTask, numWorkers)
1752	results := make(map[string]*structs.SIToken, numWorkers)
1753	for i := 0; i < numWorkers; i++ {
1754		g.Go(func() error {
1755			for {
1756				select {
1757				case task, ok := <-input:
1758					if !ok {
1759						return nil
1760					}
1761					secret, err := n.srv.consulACLs.CreateToken(ctx, ServiceIdentityRequest{
1762						ConsulNamespace: tg.Consul.GetNamespace(),
1763						TaskKind:        task.TaskKind,
1764						TaskName:        task.TaskName,
1765						ClusterID:       clusterID,
1766						AllocID:         alloc.ID,
1767					})
1768					if err != nil {
1769						return err
1770					}
1771					results[task.TaskName] = secret
1772				case <-ctx.Done():
1773					return nil
1774				}
1775			}
1776		})
1777	}
1778
1779	// Send the input
1780	go func() {
1781		defer close(input)
1782		for _, connectTask := range tasks {
1783			select {
1784			case <-ctx.Done():
1785				return
1786			case input <- connectTask:
1787			}
1788		}
1789	}()
1790
1791	// Wait for everything to complete or for an error
1792	createErr := g.Wait()
1793
1794	accessors := make([]*structs.SITokenAccessor, 0, len(results))
1795	tokens := make(map[string]string, len(results))
1796	for task, secret := range results {
1797		tokens[task] = secret.SecretID
1798		accessor := &structs.SITokenAccessor{
1799			ConsulNamespace: tg.Consul.GetNamespace(),
1800			NodeID:          alloc.NodeID,
1801			AllocID:         alloc.ID,
1802			TaskName:        task,
1803			AccessorID:      secret.AccessorID,
1804		}
1805		accessors = append(accessors, accessor)
1806	}
1807
1808	// If there was an error, revoke all created tokens. These tokens have not
1809	// yet been committed to the persistent store.
1810	if createErr != nil {
1811		n.logger.Error("Consul Service Identity token creation for alloc failed", "alloc_id", alloc.ID, "error", createErr)
1812		_ = n.srv.consulACLs.RevokeTokens(context.Background(), accessors, false)
1813
1814		if recoverable, ok := createErr.(*structs.RecoverableError); ok {
1815			reply.Error = recoverable
1816		} else {
1817			reply.Error = structs.NewRecoverableError(createErr, false).(*structs.RecoverableError)
1818		}
1819
1820		return nil
1821	}
1822
1823	// Commit the derived tokens to raft before returning them
1824	requested := structs.SITokenAccessorsRequest{Accessors: accessors}
1825	_, index, err := n.srv.raftApply(structs.ServiceIdentityAccessorRegisterRequestType, &requested)
1826	if err != nil {
1827		n.logger.Error("registering Service Identity token accessors for alloc failed", "alloc_id", alloc.ID, "error", err)
1828
1829		// Determine if we can recover from the error
1830		retry := false
1831		switch err {
1832		case raft.ErrNotLeader, raft.ErrLeadershipLost, raft.ErrRaftShutdown, raft.ErrEnqueueTimeout:
1833			retry = true
1834		}
1835		setError(err, retry)
1836		return nil
1837	}
1838
1839	// We made it! Now we can set the reply.
1840	reply.Index = index
1841	reply.Tokens = tokens
1842	n.srv.setQueryMeta(&reply.QueryMeta)
1843	return nil
1844}
1845
1846func connectTasks(tg *structs.TaskGroup, tasks []string) ([]string, []connectTask) {
1847	var notConnect []string
1848	var usesConnect []connectTask
1849	for _, task := range tasks {
1850		tgTask := tg.LookupTask(task)
1851		if !taskUsesConnect(tgTask) {
1852			notConnect = append(notConnect, task)
1853		} else {
1854			usesConnect = append(usesConnect, connectTask{
1855				TaskName: task,
1856				TaskKind: tgTask.Kind,
1857			})
1858		}
1859	}
1860	return notConnect, usesConnect
1861}
1862
1863func taskUsesConnect(task *structs.Task) bool {
1864	if task == nil {
1865		// not even in the task group
1866		return false
1867	}
1868	return task.UsesConnect()
1869}
1870
1871func (n *Node) EmitEvents(args *structs.EmitNodeEventsRequest, reply *structs.EmitNodeEventsResponse) error {
1872	if done, err := n.srv.forward("Node.EmitEvents", args, args, reply); done {
1873		return err
1874	}
1875	defer metrics.MeasureSince([]string{"nomad", "client", "emit_events"}, time.Now())
1876
1877	if len(args.NodeEvents) == 0 {
1878		return fmt.Errorf("no node events given")
1879	}
1880	for nodeID, events := range args.NodeEvents {
1881		if len(events) == 0 {
1882			return fmt.Errorf("no node events given for node %q", nodeID)
1883		}
1884	}
1885
1886	_, index, err := n.srv.raftApply(structs.UpsertNodeEventsType, args)
1887	if err != nil {
1888		n.logger.Error("upserting node events failed", "error", err)
1889		return err
1890	}
1891
1892	reply.Index = index
1893	return nil
1894}
1895