1// Copyright 2016 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package raft 16 17import pb "go.etcd.io/etcd/raft/raftpb" 18 19// ReadState provides state for read only query. 20// It's caller's responsibility to call ReadIndex first before getting 21// this state from ready, it's also caller's duty to differentiate if this 22// state is what it requests through RequestCtx, eg. given a unique id as 23// RequestCtx 24type ReadState struct { 25 Index uint64 26 RequestCtx []byte 27} 28 29type readIndexStatus struct { 30 req pb.Message 31 index uint64 32 // NB: this never records 'false', but it's more convenient to use this 33 // instead of a map[uint64]struct{} due to the API of quorum.VoteResult. If 34 // this becomes performance sensitive enough (doubtful), quorum.VoteResult 35 // can change to an API that is closer to that of CommittedIndex. 36 acks map[uint64]bool 37} 38 39type readOnly struct { 40 option ReadOnlyOption 41 pendingReadIndex map[string]*readIndexStatus 42 readIndexQueue []string 43} 44 45func newReadOnly(option ReadOnlyOption) *readOnly { 46 return &readOnly{ 47 option: option, 48 pendingReadIndex: make(map[string]*readIndexStatus), 49 } 50} 51 52// addRequest adds a read only reuqest into readonly struct. 53// `index` is the commit index of the raft state machine when it received 54// the read only request. 55// `m` is the original read only request message from the local or remote node. 56func (ro *readOnly) addRequest(index uint64, m pb.Message) { 57 s := string(m.Entries[0].Data) 58 if _, ok := ro.pendingReadIndex[s]; ok { 59 return 60 } 61 ro.pendingReadIndex[s] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]bool)} 62 ro.readIndexQueue = append(ro.readIndexQueue, s) 63} 64 65// recvAck notifies the readonly struct that the raft state machine received 66// an acknowledgment of the heartbeat that attached with the read only request 67// context. 68func (ro *readOnly) recvAck(id uint64, context []byte) map[uint64]bool { 69 rs, ok := ro.pendingReadIndex[string(context)] 70 if !ok { 71 return nil 72 } 73 74 rs.acks[id] = true 75 return rs.acks 76} 77 78// advance advances the read only request queue kept by the readonly struct. 79// It dequeues the requests until it finds the read only request that has 80// the same context as the given `m`. 81func (ro *readOnly) advance(m pb.Message) []*readIndexStatus { 82 var ( 83 i int 84 found bool 85 ) 86 87 ctx := string(m.Context) 88 rss := []*readIndexStatus{} 89 90 for _, okctx := range ro.readIndexQueue { 91 i++ 92 rs, ok := ro.pendingReadIndex[okctx] 93 if !ok { 94 panic("cannot find corresponding read state from pending map") 95 } 96 rss = append(rss, rs) 97 if okctx == ctx { 98 found = true 99 break 100 } 101 } 102 103 if found { 104 ro.readIndexQueue = ro.readIndexQueue[i:] 105 for _, rs := range rss { 106 delete(ro.pendingReadIndex, string(rs.req.Entries[0].Data)) 107 } 108 return rss 109 } 110 111 return nil 112} 113 114// lastPendingRequestCtx returns the context of the last pending read only 115// request in readonly struct. 116func (ro *readOnly) lastPendingRequestCtx() string { 117 if len(ro.readIndexQueue) == 0 { 118 return "" 119 } 120 return ro.readIndexQueue[len(ro.readIndexQueue)-1] 121} 122