1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19//go:generate protoc -I grpc_testing --go_out=plugins=grpc:grpc_testing grpc_testing/control.proto grpc_testing/messages.proto grpc_testing/payloads.proto grpc_testing/services.proto grpc_testing/stats.proto 20 21/* 22Package benchmark implements the building blocks to setup end-to-end gRPC benchmarks. 23*/ 24package benchmark 25 26import ( 27 "fmt" 28 "io" 29 "net" 30 "sync" 31 "testing" 32 "time" 33 34 "golang.org/x/net/context" 35 "google.golang.org/grpc" 36 testpb "google.golang.org/grpc/benchmark/grpc_testing" 37 "google.golang.org/grpc/benchmark/latency" 38 "google.golang.org/grpc/benchmark/stats" 39 "google.golang.org/grpc/grpclog" 40) 41 42// AddOne add 1 to the features slice 43func AddOne(features []int, featuresMaxPosition []int) { 44 for i := len(features) - 1; i >= 0; i-- { 45 features[i] = (features[i] + 1) 46 if features[i]/featuresMaxPosition[i] == 0 { 47 break 48 } 49 features[i] = features[i] % featuresMaxPosition[i] 50 } 51} 52 53// Allows reuse of the same testpb.Payload object. 54func setPayload(p *testpb.Payload, t testpb.PayloadType, size int) { 55 if size < 0 { 56 grpclog.Fatalf("Requested a response with invalid length %d", size) 57 } 58 body := make([]byte, size) 59 switch t { 60 case testpb.PayloadType_COMPRESSABLE: 61 case testpb.PayloadType_UNCOMPRESSABLE: 62 grpclog.Fatalf("PayloadType UNCOMPRESSABLE is not supported") 63 default: 64 grpclog.Fatalf("Unsupported payload type: %d", t) 65 } 66 p.Type = t 67 p.Body = body 68 return 69} 70 71func newPayload(t testpb.PayloadType, size int) *testpb.Payload { 72 p := new(testpb.Payload) 73 setPayload(p, t, size) 74 return p 75} 76 77type testServer struct { 78} 79 80func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { 81 return &testpb.SimpleResponse{ 82 Payload: newPayload(in.ResponseType, int(in.ResponseSize)), 83 }, nil 84} 85 86func (s *testServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallServer) error { 87 response := &testpb.SimpleResponse{ 88 Payload: new(testpb.Payload), 89 } 90 in := new(testpb.SimpleRequest) 91 for { 92 // use ServerStream directly to reuse the same testpb.SimpleRequest object 93 err := stream.(grpc.ServerStream).RecvMsg(in) 94 if err == io.EOF { 95 // read done. 96 return nil 97 } 98 if err != nil { 99 return err 100 } 101 setPayload(response.Payload, in.ResponseType, int(in.ResponseSize)) 102 if err := stream.Send(response); err != nil { 103 return err 104 } 105 } 106} 107 108// byteBufServer is a gRPC server that sends and receives byte buffer. 109// The purpose is to benchmark the gRPC performance without protobuf serialization/deserialization overhead. 110type byteBufServer struct { 111 respSize int32 112} 113 114// UnaryCall is an empty function and is not used for benchmark. 115// If bytebuf UnaryCall benchmark is needed later, the function body needs to be updated. 116func (s *byteBufServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { 117 return &testpb.SimpleResponse{}, nil 118} 119 120func (s *byteBufServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallServer) error { 121 for { 122 var in []byte 123 err := stream.(grpc.ServerStream).RecvMsg(&in) 124 if err == io.EOF { 125 return nil 126 } 127 if err != nil { 128 return err 129 } 130 out := make([]byte, s.respSize) 131 if err := stream.(grpc.ServerStream).SendMsg(&out); err != nil { 132 return err 133 } 134 } 135} 136 137// ServerInfo contains the information to create a gRPC benchmark server. 138type ServerInfo struct { 139 // Addr is the address of the server. 140 Addr string 141 142 // Type is the type of the server. 143 // It should be "protobuf" or "bytebuf". 144 Type string 145 146 // Metadata is an optional configuration. 147 // For "protobuf", it's ignored. 148 // For "bytebuf", it should be an int representing response size. 149 Metadata interface{} 150 151 // Network can simulate latency 152 Network *latency.Network 153} 154 155// StartServer starts a gRPC server serving a benchmark service according to info. 156// It returns its listen address and a function to stop the server. 157func StartServer(info ServerInfo, opts ...grpc.ServerOption) (string, func()) { 158 lis, err := net.Listen("tcp", info.Addr) 159 if err != nil { 160 grpclog.Fatalf("Failed to listen: %v", err) 161 } 162 nw := info.Network 163 if nw != nil { 164 lis = nw.Listener(lis) 165 } 166 opts = append(opts, grpc.WriteBufferSize(128*1024)) 167 opts = append(opts, grpc.ReadBufferSize(128*1024)) 168 s := grpc.NewServer(opts...) 169 switch info.Type { 170 case "protobuf": 171 testpb.RegisterBenchmarkServiceServer(s, &testServer{}) 172 case "bytebuf": 173 respSize, ok := info.Metadata.(int32) 174 if !ok { 175 grpclog.Fatalf("failed to StartServer, invalid metadata: %v, for Type: %v", info.Metadata, info.Type) 176 } 177 testpb.RegisterBenchmarkServiceServer(s, &byteBufServer{respSize: respSize}) 178 default: 179 grpclog.Fatalf("failed to StartServer, unknown Type: %v", info.Type) 180 } 181 go s.Serve(lis) 182 return lis.Addr().String(), func() { 183 s.Stop() 184 } 185} 186 187// DoUnaryCall performs an unary RPC with given stub and request and response sizes. 188func DoUnaryCall(tc testpb.BenchmarkServiceClient, reqSize, respSize int) error { 189 pl := newPayload(testpb.PayloadType_COMPRESSABLE, reqSize) 190 req := &testpb.SimpleRequest{ 191 ResponseType: pl.Type, 192 ResponseSize: int32(respSize), 193 Payload: pl, 194 } 195 if _, err := tc.UnaryCall(context.Background(), req); err != nil { 196 return fmt.Errorf("/BenchmarkService/UnaryCall(_, _) = _, %v, want _, <nil>", err) 197 } 198 return nil 199} 200 201// DoStreamingRoundTrip performs a round trip for a single streaming rpc. 202func DoStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) error { 203 pl := newPayload(testpb.PayloadType_COMPRESSABLE, reqSize) 204 req := &testpb.SimpleRequest{ 205 ResponseType: pl.Type, 206 ResponseSize: int32(respSize), 207 Payload: pl, 208 } 209 if err := stream.Send(req); err != nil { 210 return fmt.Errorf("/BenchmarkService/StreamingCall.Send(_) = %v, want <nil>", err) 211 } 212 if _, err := stream.Recv(); err != nil { 213 // EOF is a valid error here. 214 if err == io.EOF { 215 return nil 216 } 217 return fmt.Errorf("/BenchmarkService/StreamingCall.Recv(_) = %v, want <nil>", err) 218 } 219 return nil 220} 221 222// DoByteBufStreamingRoundTrip performs a round trip for a single streaming rpc, using a custom codec for byte buffer. 223func DoByteBufStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) error { 224 out := make([]byte, reqSize) 225 if err := stream.(grpc.ClientStream).SendMsg(&out); err != nil { 226 return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).SendMsg(_) = %v, want <nil>", err) 227 } 228 var in []byte 229 if err := stream.(grpc.ClientStream).RecvMsg(&in); err != nil { 230 // EOF is a valid error here. 231 if err == io.EOF { 232 return nil 233 } 234 return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).RecvMsg(_) = %v, want <nil>", err) 235 } 236 return nil 237} 238 239// NewClientConn creates a gRPC client connection to addr. 240func NewClientConn(addr string, opts ...grpc.DialOption) *grpc.ClientConn { 241 opts = append(opts, grpc.WithWriteBufferSize(128*1024)) 242 opts = append(opts, grpc.WithReadBufferSize(128*1024)) 243 conn, err := grpc.Dial(addr, opts...) 244 if err != nil { 245 grpclog.Fatalf("NewClientConn(%q) failed to create a ClientConn %v", addr, err) 246 } 247 return conn 248} 249 250func runUnary(b *testing.B, benchFeatures stats.Features) { 251 s := stats.AddStats(b, 38) 252 nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu} 253 target, stopper := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf", Network: nw}, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1))) 254 defer stopper() 255 conn := NewClientConn( 256 target, grpc.WithInsecure(), 257 grpc.WithDialer(func(address string, timeout time.Duration) (net.Conn, error) { 258 return nw.TimeoutDialer(net.DialTimeout)("tcp", address, timeout) 259 }), 260 ) 261 tc := testpb.NewBenchmarkServiceClient(conn) 262 263 // Warm up connection. 264 for i := 0; i < 10; i++ { 265 unaryCaller(tc, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes) 266 } 267 ch := make(chan int, benchFeatures.MaxConcurrentCalls*4) 268 var ( 269 mu sync.Mutex 270 wg sync.WaitGroup 271 ) 272 wg.Add(benchFeatures.MaxConcurrentCalls) 273 274 // Distribute the b.N calls over maxConcurrentCalls workers. 275 for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ { 276 go func() { 277 for range ch { 278 start := time.Now() 279 unaryCaller(tc, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes) 280 elapse := time.Since(start) 281 mu.Lock() 282 s.Add(elapse) 283 mu.Unlock() 284 } 285 wg.Done() 286 }() 287 } 288 b.ResetTimer() 289 for i := 0; i < b.N; i++ { 290 ch <- i 291 } 292 close(ch) 293 wg.Wait() 294 b.StopTimer() 295 conn.Close() 296} 297 298func runStream(b *testing.B, benchFeatures stats.Features) { 299 s := stats.AddStats(b, 38) 300 nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu} 301 target, stopper := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf", Network: nw}, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1))) 302 defer stopper() 303 conn := NewClientConn( 304 target, grpc.WithInsecure(), 305 grpc.WithDialer(func(address string, timeout time.Duration) (net.Conn, error) { 306 return nw.TimeoutDialer(net.DialTimeout)("tcp", address, timeout) 307 }), 308 ) 309 tc := testpb.NewBenchmarkServiceClient(conn) 310 311 // Warm up connection. 312 stream, err := tc.StreamingCall(context.Background()) 313 if err != nil { 314 b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err) 315 } 316 for i := 0; i < 10; i++ { 317 streamCaller(stream, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes) 318 } 319 320 ch := make(chan struct{}, benchFeatures.MaxConcurrentCalls*4) 321 var ( 322 mu sync.Mutex 323 wg sync.WaitGroup 324 ) 325 wg.Add(benchFeatures.MaxConcurrentCalls) 326 327 // Distribute the b.N calls over maxConcurrentCalls workers. 328 for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ { 329 stream, err := tc.StreamingCall(context.Background()) 330 if err != nil { 331 b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err) 332 } 333 go func() { 334 for range ch { 335 start := time.Now() 336 streamCaller(stream, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes) 337 elapse := time.Since(start) 338 mu.Lock() 339 s.Add(elapse) 340 mu.Unlock() 341 } 342 wg.Done() 343 }() 344 } 345 b.ResetTimer() 346 for i := 0; i < b.N; i++ { 347 ch <- struct{}{} 348 } 349 close(ch) 350 wg.Wait() 351 b.StopTimer() 352 conn.Close() 353} 354func unaryCaller(client testpb.BenchmarkServiceClient, reqSize, respSize int) { 355 if err := DoUnaryCall(client, reqSize, respSize); err != nil { 356 grpclog.Fatalf("DoUnaryCall failed: %v", err) 357 } 358} 359 360func streamCaller(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) { 361 if err := DoStreamingRoundTrip(stream, reqSize, respSize); err != nil { 362 grpclog.Fatalf("DoStreamingRoundTrip failed: %v", err) 363 } 364} 365