1package raft 2 3import ( 4 "io" 5 "time" 6) 7 8// RPCResponse captures both a response and a potential error. 9type RPCResponse struct { 10 Response interface{} 11 Error error 12} 13 14// RPC has a command, and provides a response mechanism. 15type RPC struct { 16 Command interface{} 17 Reader io.Reader // Set only for InstallSnapshot 18 RespChan chan<- RPCResponse 19} 20 21// Respond is used to respond with a response, error or both 22func (r *RPC) Respond(resp interface{}, err error) { 23 r.RespChan <- RPCResponse{resp, err} 24} 25 26// Transport provides an interface for network transports 27// to allow Raft to communicate with other nodes. 28type Transport interface { 29 // Consumer returns a channel that can be used to 30 // consume and respond to RPC requests. 31 Consumer() <-chan RPC 32 33 // LocalAddr is used to return our local address to distinguish from our peers. 34 LocalAddr() ServerAddress 35 36 // AppendEntriesPipeline returns an interface that can be used to pipeline 37 // AppendEntries requests. 38 AppendEntriesPipeline(id ServerID, target ServerAddress) (AppendPipeline, error) 39 40 // AppendEntries sends the appropriate RPC to the target node. 41 AppendEntries(id ServerID, target ServerAddress, args *AppendEntriesRequest, resp *AppendEntriesResponse) error 42 43 // RequestVote sends the appropriate RPC to the target node. 44 RequestVote(id ServerID, target ServerAddress, args *RequestVoteRequest, resp *RequestVoteResponse) error 45 46 // InstallSnapshot is used to push a snapshot down to a follower. The data is read from 47 // the ReadCloser and streamed to the client. 48 InstallSnapshot(id ServerID, target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error 49 50 // EncodePeer is used to serialize a peer's address. 51 EncodePeer(id ServerID, addr ServerAddress) []byte 52 53 // DecodePeer is used to deserialize a peer's address. 54 DecodePeer([]byte) ServerAddress 55 56 // SetHeartbeatHandler is used to setup a heartbeat handler 57 // as a fast-pass. This is to avoid head-of-line blocking from 58 // disk IO. If a Transport does not support this, it can simply 59 // ignore the call, and push the heartbeat onto the Consumer channel. 60 SetHeartbeatHandler(cb func(rpc RPC)) 61 62 // TimeoutNow is used to start a leadership transfer to the target node. 63 TimeoutNow(id ServerID, target ServerAddress, args *TimeoutNowRequest, resp *TimeoutNowResponse) error 64} 65 66// WithClose is an interface that a transport may provide which 67// allows a transport to be shut down cleanly when a Raft instance 68// shuts down. 69// 70// It is defined separately from Transport as unfortunately it wasn't in the 71// original interface specification. 72type WithClose interface { 73 // Close permanently closes a transport, stopping 74 // any associated goroutines and freeing other resources. 75 Close() error 76} 77 78// LoopbackTransport is an interface that provides a loopback transport suitable for testing 79// e.g. InmemTransport. It's there so we don't have to rewrite tests. 80type LoopbackTransport interface { 81 Transport // Embedded transport reference 82 WithPeers // Embedded peer management 83 WithClose // with a close routine 84} 85 86// WithPeers is an interface that a transport may provide which allows for connection and 87// disconnection. Unless the transport is a loopback transport, the transport specified to 88// "Connect" is likely to be nil. 89type WithPeers interface { 90 Connect(peer ServerAddress, t Transport) // Connect a peer 91 Disconnect(peer ServerAddress) // Disconnect a given peer 92 DisconnectAll() // Disconnect all peers, possibly to reconnect them later 93} 94 95// AppendPipeline is used for pipelining AppendEntries requests. It is used 96// to increase the replication throughput by masking latency and better 97// utilizing bandwidth. 98type AppendPipeline interface { 99 // AppendEntries is used to add another request to the pipeline. 100 // The send may block which is an effective form of back-pressure. 101 AppendEntries(args *AppendEntriesRequest, resp *AppendEntriesResponse) (AppendFuture, error) 102 103 // Consumer returns a channel that can be used to consume 104 // response futures when they are ready. 105 Consumer() <-chan AppendFuture 106 107 // Close closes the pipeline and cancels all inflight RPCs 108 Close() error 109} 110 111// AppendFuture is used to return information about a pipelined AppendEntries request. 112type AppendFuture interface { 113 Future 114 115 // Start returns the time that the append request was started. 116 // It is always OK to call this method. 117 Start() time.Time 118 119 // Request holds the parameters of the AppendEntries call. 120 // It is always OK to call this method. 121 Request() *AppendEntriesRequest 122 123 // Response holds the results of the AppendEntries call. 124 // This method must only be called after the Error 125 // method returns, and will only be valid on success. 126 Response() *AppendEntriesResponse 127} 128