1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package rafthttp 16 17import ( 18 "bytes" 19 "context" 20 "errors" 21 "io/ioutil" 22 "sync" 23 "time" 24 25 stats "go.etcd.io/etcd/etcdserver/api/v2stats" 26 "go.etcd.io/etcd/pkg/pbutil" 27 "go.etcd.io/etcd/pkg/types" 28 "go.etcd.io/etcd/raft" 29 "go.etcd.io/etcd/raft/raftpb" 30 31 "go.uber.org/zap" 32) 33 34const ( 35 connPerPipeline = 4 36 // pipelineBufSize is the size of pipeline buffer, which helps hold the 37 // temporary network latency. 38 // The size ensures that pipeline does not drop messages when the network 39 // is out of work for less than 1 second in good path. 40 pipelineBufSize = 64 41) 42 43var errStopped = errors.New("stopped") 44 45type pipeline struct { 46 peerID types.ID 47 48 tr *Transport 49 picker *urlPicker 50 status *peerStatus 51 raft Raft 52 errorc chan error 53 // deprecate when we depercate v2 API 54 followerStats *stats.FollowerStats 55 56 msgc chan raftpb.Message 57 // wait for the handling routines 58 wg sync.WaitGroup 59 stopc chan struct{} 60} 61 62func (p *pipeline) start() { 63 p.stopc = make(chan struct{}) 64 p.msgc = make(chan raftpb.Message, pipelineBufSize) 65 p.wg.Add(connPerPipeline) 66 for i := 0; i < connPerPipeline; i++ { 67 go p.handle() 68 } 69 70 if p.tr != nil && p.tr.Logger != nil { 71 p.tr.Logger.Info( 72 "started HTTP pipelining with remote peer", 73 zap.String("local-member-id", p.tr.ID.String()), 74 zap.String("remote-peer-id", p.peerID.String()), 75 ) 76 } else { 77 plog.Infof("started HTTP pipelining with peer %s", p.peerID) 78 } 79} 80 81func (p *pipeline) stop() { 82 close(p.stopc) 83 p.wg.Wait() 84 85 if p.tr != nil && p.tr.Logger != nil { 86 p.tr.Logger.Info( 87 "stopped HTTP pipelining with remote peer", 88 zap.String("local-member-id", p.tr.ID.String()), 89 zap.String("remote-peer-id", p.peerID.String()), 90 ) 91 } else { 92 plog.Infof("stopped HTTP pipelining with peer %s", p.peerID) 93 } 94} 95 96func (p *pipeline) handle() { 97 defer p.wg.Done() 98 99 for { 100 select { 101 case m := <-p.msgc: 102 start := time.Now() 103 err := p.post(pbutil.MustMarshal(&m)) 104 end := time.Now() 105 106 if err != nil { 107 p.status.deactivate(failureType{source: pipelineMsg, action: "write"}, err.Error()) 108 109 if m.Type == raftpb.MsgApp && p.followerStats != nil { 110 p.followerStats.Fail() 111 } 112 p.raft.ReportUnreachable(m.To) 113 if isMsgSnap(m) { 114 p.raft.ReportSnapshot(m.To, raft.SnapshotFailure) 115 } 116 sentFailures.WithLabelValues(types.ID(m.To).String()).Inc() 117 continue 118 } 119 120 p.status.activate() 121 if m.Type == raftpb.MsgApp && p.followerStats != nil { 122 p.followerStats.Succ(end.Sub(start)) 123 } 124 if isMsgSnap(m) { 125 p.raft.ReportSnapshot(m.To, raft.SnapshotFinish) 126 } 127 sentBytes.WithLabelValues(types.ID(m.To).String()).Add(float64(m.Size())) 128 case <-p.stopc: 129 return 130 } 131 } 132} 133 134// post POSTs a data payload to a url. Returns nil if the POST succeeds, 135// error on any failure. 136func (p *pipeline) post(data []byte) (err error) { 137 u := p.picker.pick() 138 req := createPostRequest(u, RaftPrefix, bytes.NewBuffer(data), "application/protobuf", p.tr.URLs, p.tr.ID, p.tr.ClusterID) 139 140 done := make(chan struct{}, 1) 141 ctx, cancel := context.WithCancel(context.Background()) 142 req = req.WithContext(ctx) 143 go func() { 144 select { 145 case <-done: 146 case <-p.stopc: 147 waitSchedule() 148 cancel() 149 } 150 }() 151 152 resp, err := p.tr.pipelineRt.RoundTrip(req) 153 done <- struct{}{} 154 if err != nil { 155 p.picker.unreachable(u) 156 return err 157 } 158 defer resp.Body.Close() 159 b, err := ioutil.ReadAll(resp.Body) 160 if err != nil { 161 p.picker.unreachable(u) 162 return err 163 } 164 165 err = checkPostResponse(resp, b, req, p.peerID) 166 if err != nil { 167 p.picker.unreachable(u) 168 // errMemberRemoved is a critical error since a removed member should 169 // always be stopped. So we use reportCriticalError to report it to errorc. 170 if err == errMemberRemoved { 171 reportCriticalError(err, p.errorc) 172 } 173 return err 174 } 175 176 return nil 177} 178 179// waitSchedule waits other goroutines to be scheduled for a while 180func waitSchedule() { time.Sleep(time.Millisecond) } 181