1package raft
2
3import (
4	"io"
5	"time"
6)
7
8// RPCResponse captures both a response and a potential error.
9type RPCResponse struct {
10	Response interface{}
11	Error    error
12}
13
14// RPC has a command, and provides a response mechanism.
15type RPC struct {
16	Command  interface{}
17	Reader   io.Reader // Set only for InstallSnapshot
18	RespChan chan<- RPCResponse
19}
20
21// Respond is used to respond with a response, error or both
22func (r *RPC) Respond(resp interface{}, err error) {
23	r.RespChan <- RPCResponse{resp, err}
24}
25
26// Transport provides an interface for network transports
27// to allow Raft to communicate with other nodes.
28type Transport interface {
29	// Consumer returns a channel that can be used to
30	// consume and respond to RPC requests.
31	Consumer() <-chan RPC
32
33	// LocalAddr is used to return our local address to distinguish from our peers.
34	LocalAddr() ServerAddress
35
36	// AppendEntriesPipeline returns an interface that can be used to pipeline
37	// AppendEntries requests.
38	AppendEntriesPipeline(id ServerID, target ServerAddress) (AppendPipeline, error)
39
40	// AppendEntries sends the appropriate RPC to the target node.
41	AppendEntries(id ServerID, target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error
42
43	// RequestVote sends the appropriate RPC to the target node.
44	RequestVote(id ServerID, target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error
45
46	// InstallSnapshot is used to push a snapshot down to a follower. The data is read from
47	// the ReadCloser and streamed to the client.
48	InstallSnapshot(id ServerID, target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error
49
50	// EncodePeer is used to serialize a peer's address.
51	EncodePeer(id ServerID, addr ServerAddress) []byte
52
53	// DecodePeer is used to deserialize a peer's address.
54	DecodePeer([]byte) ServerAddress
55
56	// SetHeartbeatHandler is used to setup a heartbeat handler
57	// as a fast-pass. This is to avoid head-of-line blocking from
58	// disk IO. If a Transport does not support this, it can simply
59	// ignore the call, and push the heartbeat onto the Consumer channel.
60	SetHeartbeatHandler(cb func(rpc RPC))
61
62	// TimeoutNow is used to start a leadership transfer to the target node.
63	TimeoutNow(id ServerID, target ServerAddress, args *TimeoutNowRequest, resp *TimeoutNowResponse) error
64}
65
66// WithClose is an interface that a transport may provide which
67// allows a transport to be shut down cleanly when a Raft instance
68// shuts down.
69//
70// It is defined separately from Transport as unfortunately it wasn't in the
71// original interface specification.
72type WithClose interface {
73	// Close permanently closes a transport, stopping
74	// any associated goroutines and freeing other resources.
75	Close() error
76}
77
78// LoopbackTransport is an interface that provides a loopback transport suitable for testing
79// e.g. InmemTransport. It's there so we don't have to rewrite tests.
80type LoopbackTransport interface {
81	Transport // Embedded transport reference
82	WithPeers // Embedded peer management
83	WithClose // with a close routine
84}
85
86// WithPeers is an interface that a transport may provide which allows for connection and
87// disconnection. Unless the transport is a loopback transport, the transport specified to
88// "Connect" is likely to be nil.
89type WithPeers interface {
90	Connect(peer ServerAddress, t Transport) // Connect a peer
91	Disconnect(peer ServerAddress)           // Disconnect a given peer
92	DisconnectAll()                          // Disconnect all peers, possibly to reconnect them later
93}
94
95// AppendPipeline is used for pipelining AppendEntries requests. It is used
96// to increase the replication throughput by masking latency and better
97// utilizing bandwidth.
98type AppendPipeline interface {
99	// AppendEntries is used to add another request to the pipeline.
100	// The send may block which is an effective form of back-pressure.
101	AppendEntries(args *AppendEntriesRequest, resp *AppendEntriesResponse) (AppendFuture, error)
102
103	// Consumer returns a channel that can be used to consume
104	// response futures when they are ready.
105	Consumer() <-chan AppendFuture
106
107	// Close closes the pipeline and cancels all inflight RPCs
108	Close() error
109}
110
111// AppendFuture is used to return information about a pipelined AppendEntries request.
112type AppendFuture interface {
113	Future
114
115	// Start returns the time that the append request was started.
116	// It is always OK to call this method.
117	Start() time.Time
118
119	// Request holds the parameters of the AppendEntries call.
120	// It is always OK to call this method.
121	Request() *AppendEntriesRequest
122
123	// Response holds the results of the AppendEntries call.
124	// This method must only be called after the Error
125	// method returns, and will only be valid on success.
126	Response() *AppendEntriesResponse
127}
128