1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package rafthttp 16 17import ( 18 "go.etcd.io/etcd/client/pkg/v3/types" 19 "go.etcd.io/etcd/raft/v3/raftpb" 20 21 "go.uber.org/zap" 22) 23 24type remote struct { 25 lg *zap.Logger 26 localID types.ID 27 id types.ID 28 status *peerStatus 29 pipeline *pipeline 30} 31 32func startRemote(tr *Transport, urls types.URLs, id types.ID) *remote { 33 picker := newURLPicker(urls) 34 status := newPeerStatus(tr.Logger, tr.ID, id) 35 pipeline := &pipeline{ 36 peerID: id, 37 tr: tr, 38 picker: picker, 39 status: status, 40 raft: tr.Raft, 41 errorc: tr.ErrorC, 42 } 43 pipeline.start() 44 45 return &remote{ 46 lg: tr.Logger, 47 localID: tr.ID, 48 id: id, 49 status: status, 50 pipeline: pipeline, 51 } 52} 53 54func (g *remote) send(m raftpb.Message) { 55 select { 56 case g.pipeline.msgc <- m: 57 default: 58 if g.status.isActive() { 59 if g.lg != nil { 60 g.lg.Warn( 61 "dropped internal Raft message since sending buffer is full (overloaded network)", 62 zap.String("message-type", m.Type.String()), 63 zap.String("local-member-id", g.localID.String()), 64 zap.String("from", types.ID(m.From).String()), 65 zap.String("remote-peer-id", g.id.String()), 66 zap.Bool("remote-peer-active", g.status.isActive()), 67 ) 68 } 69 } else { 70 if g.lg != nil { 71 g.lg.Warn( 72 "dropped Raft message since sending buffer is full (overloaded network)", 73 zap.String("message-type", m.Type.String()), 74 zap.String("local-member-id", g.localID.String()), 75 zap.String("from", types.ID(m.From).String()), 76 zap.String("remote-peer-id", g.id.String()), 77 zap.Bool("remote-peer-active", g.status.isActive()), 78 ) 79 } 80 } 81 sentFailures.WithLabelValues(types.ID(m.To).String()).Inc() 82 } 83} 84 85func (g *remote) stop() { 86 g.pipeline.stop() 87} 88 89func (g *remote) Pause() { 90 g.stop() 91} 92 93func (g *remote) Resume() { 94 g.pipeline.start() 95} 96