1package nomad
2
3import (
4	"errors"
5	"fmt"
6	"io"
7	"net"
8	"time"
9
10	log "github.com/hashicorp/go-hclog"
11	"github.com/hashicorp/go-msgpack/codec"
12
13	"github.com/hashicorp/consul/agent/consul/autopilot"
14	cstructs "github.com/hashicorp/nomad/client/structs"
15	"github.com/hashicorp/nomad/helper/snapshot"
16	"github.com/hashicorp/nomad/nomad/structs"
17	"github.com/hashicorp/raft"
18	"github.com/hashicorp/serf/serf"
19)
20
21// Operator endpoint is used to perform low-level operator tasks for Nomad.
22type Operator struct {
23	srv    *Server
24	logger log.Logger
25}
26
27func (op *Operator) register() {
28	op.srv.streamingRpcs.Register("Operator.SnapshotSave", op.snapshotSave)
29	op.srv.streamingRpcs.Register("Operator.SnapshotRestore", op.snapshotRestore)
30}
31
32// RaftGetConfiguration is used to retrieve the current Raft configuration.
33func (op *Operator) RaftGetConfiguration(args *structs.GenericRequest, reply *structs.RaftConfigurationResponse) error {
34	if done, err := op.srv.forward("Operator.RaftGetConfiguration", args, args, reply); done {
35		return err
36	}
37
38	// Check management permissions
39	if aclObj, err := op.srv.ResolveToken(args.AuthToken); err != nil {
40		return err
41	} else if aclObj != nil && !aclObj.IsManagement() {
42		return structs.ErrPermissionDenied
43	}
44
45	// We can't fetch the leader and the configuration atomically with
46	// the current Raft API.
47	future := op.srv.raft.GetConfiguration()
48	if err := future.Error(); err != nil {
49		return err
50	}
51
52	// Index the Nomad information about the servers.
53	serverMap := make(map[raft.ServerAddress]serf.Member)
54	for _, member := range op.srv.serf.Members() {
55		valid, parts := isNomadServer(member)
56		if !valid {
57			continue
58		}
59
60		addr := (&net.TCPAddr{IP: member.Addr, Port: parts.Port}).String()
61		serverMap[raft.ServerAddress(addr)] = member
62	}
63
64	// Fill out the reply.
65	leader := op.srv.raft.Leader()
66	reply.Index = future.Index()
67	for _, server := range future.Configuration().Servers {
68		node := "(unknown)"
69		raftProtocolVersion := "unknown"
70		if member, ok := serverMap[server.Address]; ok {
71			node = member.Name
72			if raftVsn, ok := member.Tags["raft_vsn"]; ok {
73				raftProtocolVersion = raftVsn
74			}
75		}
76
77		entry := &structs.RaftServer{
78			ID:           server.ID,
79			Node:         node,
80			Address:      server.Address,
81			Leader:       server.Address == leader,
82			Voter:        server.Suffrage == raft.Voter,
83			RaftProtocol: raftProtocolVersion,
84		}
85		reply.Servers = append(reply.Servers, entry)
86	}
87	return nil
88}
89
90// RaftRemovePeerByAddress is used to kick a stale peer (one that it in the Raft
91// quorum but no longer known to Serf or the catalog) by address in the form of
92// "IP:port". The reply argument is not used, but it required to fulfill the RPC
93// interface.
94func (op *Operator) RaftRemovePeerByAddress(args *structs.RaftPeerByAddressRequest, reply *struct{}) error {
95	if done, err := op.srv.forward("Operator.RaftRemovePeerByAddress", args, args, reply); done {
96		return err
97	}
98
99	// Check management permissions
100	if aclObj, err := op.srv.ResolveToken(args.AuthToken); err != nil {
101		return err
102	} else if aclObj != nil && !aclObj.IsManagement() {
103		return structs.ErrPermissionDenied
104	}
105
106	// Since this is an operation designed for humans to use, we will return
107	// an error if the supplied address isn't among the peers since it's
108	// likely they screwed up.
109	{
110		future := op.srv.raft.GetConfiguration()
111		if err := future.Error(); err != nil {
112			return err
113		}
114		for _, s := range future.Configuration().Servers {
115			if s.Address == args.Address {
116				goto REMOVE
117			}
118		}
119		return fmt.Errorf("address %q was not found in the Raft configuration",
120			args.Address)
121	}
122
123REMOVE:
124	// The Raft library itself will prevent various forms of foot-shooting,
125	// like making a configuration with no voters. Some consideration was
126	// given here to adding more checks, but it was decided to make this as
127	// low-level and direct as possible. We've got ACL coverage to lock this
128	// down, and if you are an operator, it's assumed you know what you are
129	// doing if you are calling this. If you remove a peer that's known to
130	// Serf, for example, it will come back when the leader does a reconcile
131	// pass.
132	future := op.srv.raft.RemovePeer(args.Address)
133	if err := future.Error(); err != nil {
134		op.logger.Warn("failed to remove Raft peer", "peer", args.Address, "error", err)
135		return err
136	}
137
138	op.logger.Warn("removed Raft peer", "peer", args.Address)
139	return nil
140}
141
142// RaftRemovePeerByID is used to kick a stale peer (one that is in the Raft
143// quorum but no longer known to Serf or the catalog) by address in the form of
144// "IP:port". The reply argument is not used, but is required to fulfill the RPC
145// interface.
146func (op *Operator) RaftRemovePeerByID(args *structs.RaftPeerByIDRequest, reply *struct{}) error {
147	if done, err := op.srv.forward("Operator.RaftRemovePeerByID", args, args, reply); done {
148		return err
149	}
150
151	// Check management permissions
152	if aclObj, err := op.srv.ResolveToken(args.AuthToken); err != nil {
153		return err
154	} else if aclObj != nil && !aclObj.IsManagement() {
155		return structs.ErrPermissionDenied
156	}
157
158	// Since this is an operation designed for humans to use, we will return
159	// an error if the supplied id isn't among the peers since it's
160	// likely they screwed up.
161	var address raft.ServerAddress
162	{
163		future := op.srv.raft.GetConfiguration()
164		if err := future.Error(); err != nil {
165			return err
166		}
167		for _, s := range future.Configuration().Servers {
168			if s.ID == args.ID {
169				address = s.Address
170				goto REMOVE
171			}
172		}
173		return fmt.Errorf("id %q was not found in the Raft configuration",
174			args.ID)
175	}
176
177REMOVE:
178	// The Raft library itself will prevent various forms of foot-shooting,
179	// like making a configuration with no voters. Some consideration was
180	// given here to adding more checks, but it was decided to make this as
181	// low-level and direct as possible. We've got ACL coverage to lock this
182	// down, and if you are an operator, it's assumed you know what you are
183	// doing if you are calling this. If you remove a peer that's known to
184	// Serf, for example, it will come back when the leader does a reconcile
185	// pass.
186	minRaftProtocol, err := op.srv.autopilot.MinRaftProtocol()
187	if err != nil {
188		return err
189	}
190
191	var future raft.Future
192	if minRaftProtocol >= 2 {
193		future = op.srv.raft.RemoveServer(args.ID, 0, 0)
194	} else {
195		future = op.srv.raft.RemovePeer(address)
196	}
197	if err := future.Error(); err != nil {
198		op.logger.Warn("failed to remove Raft peer", "peer_id", args.ID, "error", err)
199		return err
200	}
201
202	op.logger.Warn("removed Raft peer", "peer_id", args.ID)
203	return nil
204}
205
206// AutopilotGetConfiguration is used to retrieve the current Autopilot configuration.
207func (op *Operator) AutopilotGetConfiguration(args *structs.GenericRequest, reply *structs.AutopilotConfig) error {
208	if done, err := op.srv.forward("Operator.AutopilotGetConfiguration", args, args, reply); done {
209		return err
210	}
211
212	// This action requires operator read access.
213	rule, err := op.srv.ResolveToken(args.AuthToken)
214	if err != nil {
215		return err
216	}
217	if rule != nil && !rule.AllowOperatorRead() {
218		return structs.ErrPermissionDenied
219	}
220
221	state := op.srv.fsm.State()
222	_, config, err := state.AutopilotConfig()
223	if err != nil {
224		return err
225	}
226	if config == nil {
227		return fmt.Errorf("autopilot config not initialized yet")
228	}
229
230	*reply = *config
231
232	return nil
233}
234
235// AutopilotSetConfiguration is used to set the current Autopilot configuration.
236func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRequest, reply *bool) error {
237	if done, err := op.srv.forward("Operator.AutopilotSetConfiguration", args, args, reply); done {
238		return err
239	}
240
241	// This action requires operator write access.
242	rule, err := op.srv.ResolveToken(args.AuthToken)
243	if err != nil {
244		return err
245	}
246	if rule != nil && !rule.AllowOperatorWrite() {
247		return structs.ErrPermissionDenied
248	}
249
250	// All servers should be at or above 0.8.0 to apply this operatation
251	if !ServersMeetMinimumVersion(op.srv.Members(), minAutopilotVersion, false) {
252		return fmt.Errorf("All servers should be running version %v to update autopilot config", minAutopilotVersion)
253	}
254
255	// Apply the update
256	resp, _, err := op.srv.raftApply(structs.AutopilotRequestType, args)
257	if err != nil {
258		op.logger.Error("failed applying AutoPilot configuration", "error", err)
259		return err
260	}
261	if respErr, ok := resp.(error); ok {
262		return respErr
263	}
264
265	// Check if the return type is a bool.
266	if respBool, ok := resp.(bool); ok {
267		*reply = respBool
268	}
269	return nil
270}
271
272// ServerHealth is used to get the current health of the servers.
273func (op *Operator) ServerHealth(args *structs.GenericRequest, reply *autopilot.OperatorHealthReply) error {
274	// This must be sent to the leader, so we fix the args since we are
275	// re-using a structure where we don't support all the options.
276	args.AllowStale = false
277	if done, err := op.srv.forward("Operator.ServerHealth", args, args, reply); done {
278		return err
279	}
280
281	// This action requires operator read access.
282	rule, err := op.srv.ResolveToken(args.AuthToken)
283	if err != nil {
284		return err
285	}
286	if rule != nil && !rule.AllowOperatorRead() {
287		return structs.ErrPermissionDenied
288	}
289
290	// Exit early if the min Raft version is too low
291	minRaftProtocol, err := op.srv.autopilot.MinRaftProtocol()
292	if err != nil {
293		return fmt.Errorf("error getting server raft protocol versions: %s", err)
294	}
295	if minRaftProtocol < 3 {
296		return fmt.Errorf("all servers must have raft_protocol set to 3 or higher to use this endpoint")
297	}
298
299	*reply = op.srv.autopilot.GetClusterHealth()
300
301	return nil
302}
303
304// SchedulerSetConfiguration is used to set the current Scheduler configuration.
305func (op *Operator) SchedulerSetConfiguration(args *structs.SchedulerSetConfigRequest, reply *structs.SchedulerSetConfigurationResponse) error {
306	if done, err := op.srv.forward("Operator.SchedulerSetConfiguration", args, args, reply); done {
307		return err
308	}
309
310	// This action requires operator write access.
311	rule, err := op.srv.ResolveToken(args.AuthToken)
312	if err != nil {
313		return err
314	} else if rule != nil && !rule.AllowOperatorWrite() {
315		return structs.ErrPermissionDenied
316	}
317
318	// All servers should be at or above 0.9.0 to apply this operation
319	if !ServersMeetMinimumVersion(op.srv.Members(), minSchedulerConfigVersion, false) {
320		return fmt.Errorf("All servers should be running version %v to update scheduler config", minSchedulerConfigVersion)
321	}
322
323	// Apply the update
324	resp, index, err := op.srv.raftApply(structs.SchedulerConfigRequestType, args)
325	if err != nil {
326		op.logger.Error("failed applying Scheduler configuration", "error", err)
327		return err
328	} else if respErr, ok := resp.(error); ok {
329		return respErr
330	}
331	//  If CAS request, raft returns a boolean indicating if the update was applied.
332	// Otherwise, assume success
333	reply.Updated = true
334	if respBool, ok := resp.(bool); ok {
335		reply.Updated = respBool
336	}
337	reply.Index = index
338	return nil
339}
340
341// SchedulerGetConfiguration is used to retrieve the current Scheduler configuration.
342func (op *Operator) SchedulerGetConfiguration(args *structs.GenericRequest, reply *structs.SchedulerConfigurationResponse) error {
343	if done, err := op.srv.forward("Operator.SchedulerGetConfiguration", args, args, reply); done {
344		return err
345	}
346
347	// This action requires operator read access.
348	rule, err := op.srv.ResolveToken(args.AuthToken)
349	if err != nil {
350		return err
351	} else if rule != nil && !rule.AllowOperatorRead() {
352		return structs.ErrPermissionDenied
353	}
354
355	state := op.srv.fsm.State()
356	index, config, err := state.SchedulerConfig()
357
358	if err != nil {
359		return err
360	} else if config == nil {
361		return fmt.Errorf("scheduler config not initialized yet")
362	}
363
364	reply.SchedulerConfig = config
365	reply.QueryMeta.Index = index
366	op.srv.setQueryMeta(&reply.QueryMeta)
367
368	return nil
369}
370
371func (op *Operator) forwardStreamingRPC(region string, method string, args interface{}, in io.ReadWriteCloser) error {
372	server, err := op.srv.findRegionServer(region)
373	if err != nil {
374		return err
375	}
376
377	return op.forwardStreamingRPCToServer(server, method, args, in)
378}
379
380func (op *Operator) forwardStreamingRPCToServer(server *serverParts, method string, args interface{}, in io.ReadWriteCloser) error {
381	srvConn, err := op.srv.streamingRpc(server, method)
382	if err != nil {
383		return err
384	}
385	defer srvConn.Close()
386
387	outEncoder := codec.NewEncoder(srvConn, structs.MsgpackHandle)
388	if err := outEncoder.Encode(args); err != nil {
389		return err
390	}
391
392	structs.Bridge(in, srvConn)
393	return nil
394}
395
396func (op *Operator) snapshotSave(conn io.ReadWriteCloser) {
397	defer conn.Close()
398
399	var args structs.SnapshotSaveRequest
400	var reply structs.SnapshotSaveResponse
401	decoder := codec.NewDecoder(conn, structs.MsgpackHandle)
402	encoder := codec.NewEncoder(conn, structs.MsgpackHandle)
403
404	handleFailure := func(code int, err error) {
405		encoder.Encode(&structs.SnapshotSaveResponse{
406			ErrorCode: code,
407			ErrorMsg:  err.Error(),
408		})
409	}
410
411	if err := decoder.Decode(&args); err != nil {
412		handleFailure(500, err)
413		return
414	}
415
416	// Forward to appropriate region
417	if args.Region != op.srv.Region() {
418		err := op.forwardStreamingRPC(args.Region, "Operator.SnapshotSave", args, conn)
419		if err != nil {
420			handleFailure(500, err)
421		}
422		return
423	}
424
425	// forward to leader
426	if !args.AllowStale {
427		remoteServer, err := op.srv.getLeaderForRPC()
428		if err != nil {
429			handleFailure(500, err)
430			return
431		}
432		if remoteServer != nil {
433			err := op.forwardStreamingRPCToServer(remoteServer, "Operator.SnapshotSave", args, conn)
434			if err != nil {
435				handleFailure(500, err)
436			}
437			return
438
439		}
440	}
441
442	// Check agent permissions
443	if aclObj, err := op.srv.ResolveToken(args.AuthToken); err != nil {
444		code := 500
445		if err == structs.ErrTokenNotFound {
446			code = 400
447		}
448		handleFailure(code, err)
449		return
450	} else if aclObj != nil && !aclObj.IsManagement() {
451		handleFailure(403, structs.ErrPermissionDenied)
452		return
453	}
454
455	op.srv.setQueryMeta(&reply.QueryMeta)
456
457	// Take the snapshot and capture the index.
458	snap, err := snapshot.New(op.logger.Named("snapshot"), op.srv.raft)
459	reply.SnapshotChecksum = snap.Checksum()
460	reply.Index = snap.Index()
461	if err != nil {
462		handleFailure(500, err)
463		return
464	}
465	defer snap.Close()
466
467	if err := encoder.Encode(&reply); err != nil {
468		handleFailure(500, fmt.Errorf("failed to encode response: %v", err))
469		return
470	}
471	if snap != nil {
472		if _, err := io.Copy(conn, snap); err != nil {
473			handleFailure(500, fmt.Errorf("failed to stream snapshot: %v", err))
474		}
475	}
476}
477
478func (op *Operator) snapshotRestore(conn io.ReadWriteCloser) {
479	defer conn.Close()
480
481	var args structs.SnapshotRestoreRequest
482	var reply structs.SnapshotRestoreResponse
483	decoder := codec.NewDecoder(conn, structs.MsgpackHandle)
484	encoder := codec.NewEncoder(conn, structs.MsgpackHandle)
485
486	handleFailure := func(code int, err error) {
487		encoder.Encode(&structs.SnapshotRestoreResponse{
488			ErrorCode: code,
489			ErrorMsg:  err.Error(),
490		})
491	}
492
493	if err := decoder.Decode(&args); err != nil {
494		handleFailure(500, err)
495		return
496	}
497
498	// Forward to appropriate region
499	if args.Region != op.srv.Region() {
500		err := op.forwardStreamingRPC(args.Region, "Operator.SnapshotRestore", args, conn)
501		if err != nil {
502			handleFailure(500, err)
503		}
504		return
505	}
506
507	// forward to leader
508	remoteServer, err := op.srv.getLeaderForRPC()
509	if err != nil {
510		handleFailure(500, err)
511		return
512	}
513	if remoteServer != nil {
514		err := op.forwardStreamingRPCToServer(remoteServer, "Operator.SnapshotRestore", args, conn)
515		if err != nil {
516			handleFailure(500, err)
517		}
518		return
519
520	}
521
522	// Check agent permissions
523	if aclObj, err := op.srv.ResolveToken(args.AuthToken); err != nil {
524		code := 500
525		if err == structs.ErrTokenNotFound {
526			code = 400
527		}
528		handleFailure(code, err)
529		return
530	} else if aclObj != nil && !aclObj.IsManagement() {
531		handleFailure(403, structs.ErrPermissionDenied)
532		return
533	}
534
535	op.srv.setQueryMeta(&reply.QueryMeta)
536
537	reader, errCh := decodeStreamOutput(decoder)
538
539	err = snapshot.Restore(op.logger.Named("snapshot"), reader, op.srv.raft)
540	if err != nil {
541		handleFailure(500, fmt.Errorf("failed to restore from snapshot: %v", err))
542		return
543	}
544
545	err = <-errCh
546	if err != nil {
547		handleFailure(400, fmt.Errorf("failed to read stream: %v", err))
548		return
549	}
550
551	// This'll be used for feedback from the leader loop.
552	timeoutCh := time.After(time.Minute)
553
554	lerrCh := make(chan error, 1)
555
556	select {
557	// Reassert leader actions and update all leader related state
558	// with new state store content.
559	case op.srv.reassertLeaderCh <- lerrCh:
560
561	// We might have lost leadership while waiting to kick the loop.
562	case <-timeoutCh:
563		handleFailure(500, fmt.Errorf("timed out waiting to re-run leader actions"))
564
565	// Make sure we don't get stuck during shutdown
566	case <-op.srv.shutdownCh:
567	}
568
569	select {
570	// Wait for the leader loop to finish up.
571	case err := <-lerrCh:
572		if err != nil {
573			handleFailure(500, err)
574			return
575		}
576
577	// We might have lost leadership while the loop was doing its
578	// thing.
579	case <-timeoutCh:
580		handleFailure(500, fmt.Errorf("timed out waiting for re-run of leader actions"))
581
582	// Make sure we don't get stuck during shutdown
583	case <-op.srv.shutdownCh:
584	}
585
586	reply.Index, _ = op.srv.State().LatestIndex()
587	op.srv.setQueryMeta(&reply.QueryMeta)
588	encoder.Encode(reply)
589}
590
591func decodeStreamOutput(decoder *codec.Decoder) (io.Reader, <-chan error) {
592	pr, pw := io.Pipe()
593	errCh := make(chan error, 1)
594
595	go func() {
596		defer close(errCh)
597
598		for {
599			var wrapper cstructs.StreamErrWrapper
600
601			err := decoder.Decode(&wrapper)
602			if err != nil {
603				pw.CloseWithError(fmt.Errorf("failed to decode input: %v", err))
604				errCh <- err
605				return
606			}
607
608			if len(wrapper.Payload) != 0 {
609				_, err = pw.Write(wrapper.Payload)
610				if err != nil {
611					pw.CloseWithError(err)
612					errCh <- err
613					return
614				}
615			}
616
617			if errW := wrapper.Error; errW != nil {
618				if errW.Message == io.EOF.Error() {
619					pw.CloseWithError(io.EOF)
620				} else {
621					pw.CloseWithError(errors.New(errW.Message))
622				}
623				return
624			}
625		}
626	}()
627
628	return pr, errCh
629}
630