1// Copyright 2015 The etcd Authors 2// 3// Licensed under the Apache License, Version 2.0 (the "License"); 4// you may not use this file except in compliance with the License. 5// You may obtain a copy of the License at 6// 7// http://www.apache.org/licenses/LICENSE-2.0 8// 9// Unless required by applicable law or agreed to in writing, software 10// distributed under the License is distributed on an "AS IS" BASIS, 11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12// See the License for the specific language governing permissions and 13// limitations under the License. 14 15package rafthttp 16 17import ( 18 "context" 19 "net/http/httptest" 20 "sync" 21 "testing" 22 "time" 23 24 stats "go.etcd.io/etcd/etcdserver/api/v2stats" 25 "go.etcd.io/etcd/pkg/types" 26 "go.etcd.io/etcd/raft" 27 "go.etcd.io/etcd/raft/raftpb" 28) 29 30func BenchmarkSendingMsgApp(b *testing.B) { 31 // member 1 32 tr := &Transport{ 33 ID: types.ID(1), 34 ClusterID: types.ID(1), 35 Raft: &fakeRaft{}, 36 ServerStats: newServerStats(), 37 LeaderStats: stats.NewLeaderStats("1"), 38 } 39 tr.Start() 40 srv := httptest.NewServer(tr.Handler()) 41 defer srv.Close() 42 43 // member 2 44 r := &countRaft{} 45 tr2 := &Transport{ 46 ID: types.ID(2), 47 ClusterID: types.ID(1), 48 Raft: r, 49 ServerStats: newServerStats(), 50 LeaderStats: stats.NewLeaderStats("2"), 51 } 52 tr2.Start() 53 srv2 := httptest.NewServer(tr2.Handler()) 54 defer srv2.Close() 55 56 tr.AddPeer(types.ID(2), []string{srv2.URL}) 57 defer tr.Stop() 58 tr2.AddPeer(types.ID(1), []string{srv.URL}) 59 defer tr2.Stop() 60 if !waitStreamWorking(tr.Get(types.ID(2)).(*peer)) { 61 b.Fatalf("stream from 1 to 2 is not in work as expected") 62 } 63 64 b.ReportAllocs() 65 b.SetBytes(64) 66 67 b.ResetTimer() 68 data := make([]byte, 64) 69 for i := 0; i < b.N; i++ { 70 tr.Send([]raftpb.Message{ 71 { 72 Type: raftpb.MsgApp, 73 From: 1, 74 To: 2, 75 Index: uint64(i), 76 Entries: []raftpb.Entry{ 77 { 78 Index: uint64(i + 1), 79 Data: data, 80 }, 81 }, 82 }, 83 }) 84 } 85 // wait until all messages are received by the target raft 86 for r.count() != b.N { 87 time.Sleep(time.Millisecond) 88 } 89 b.StopTimer() 90} 91 92type countRaft struct { 93 mu sync.Mutex 94 cnt int 95} 96 97func (r *countRaft) Process(ctx context.Context, m raftpb.Message) error { 98 r.mu.Lock() 99 defer r.mu.Unlock() 100 r.cnt++ 101 return nil 102} 103 104func (r *countRaft) IsIDRemoved(id uint64) bool { return false } 105 106func (r *countRaft) ReportUnreachable(id uint64) {} 107 108func (r *countRaft) ReportSnapshot(id uint64, status raft.SnapshotStatus) {} 109 110func (r *countRaft) count() int { 111 r.mu.Lock() 112 defer r.mu.Unlock() 113 return r.cnt 114} 115