1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19//go:generate protoc -I grpc_testing --go_out=plugins=grpc:grpc_testing grpc_testing/control.proto grpc_testing/messages.proto grpc_testing/payloads.proto grpc_testing/services.proto grpc_testing/stats.proto 20 21/* 22Package benchmark implements the building blocks to setup end-to-end gRPC benchmarks. 23*/ 24package benchmark 25 26import ( 27 "context" 28 "fmt" 29 "io" 30 "log" 31 "net" 32 33 "google.golang.org/grpc" 34 testpb "google.golang.org/grpc/benchmark/grpc_testing" 35 "google.golang.org/grpc/codes" 36 "google.golang.org/grpc/grpclog" 37 "google.golang.org/grpc/status" 38) 39 40// Allows reuse of the same testpb.Payload object. 41func setPayload(p *testpb.Payload, t testpb.PayloadType, size int) { 42 if size < 0 { 43 grpclog.Fatalf("Requested a response with invalid length %d", size) 44 } 45 body := make([]byte, size) 46 switch t { 47 case testpb.PayloadType_COMPRESSABLE: 48 case testpb.PayloadType_UNCOMPRESSABLE: 49 grpclog.Fatalf("PayloadType UNCOMPRESSABLE is not supported") 50 default: 51 grpclog.Fatalf("Unsupported payload type: %d", t) 52 } 53 p.Type = t 54 p.Body = body 55} 56 57// NewPayload creates a payload with the given type and size. 58func NewPayload(t testpb.PayloadType, size int) *testpb.Payload { 59 p := new(testpb.Payload) 60 setPayload(p, t, size) 61 return p 62} 63 64type testServer struct { 65} 66 67func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { 68 return &testpb.SimpleResponse{ 69 Payload: NewPayload(in.ResponseType, int(in.ResponseSize)), 70 }, nil 71} 72 73func (s *testServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallServer) error { 74 response := &testpb.SimpleResponse{ 75 Payload: new(testpb.Payload), 76 } 77 in := new(testpb.SimpleRequest) 78 for { 79 // use ServerStream directly to reuse the same testpb.SimpleRequest object 80 err := stream.(grpc.ServerStream).RecvMsg(in) 81 if err == io.EOF { 82 // read done. 83 return nil 84 } 85 if err != nil { 86 return err 87 } 88 setPayload(response.Payload, in.ResponseType, int(in.ResponseSize)) 89 if err := stream.Send(response); err != nil { 90 return err 91 } 92 } 93} 94 95func (s *testServer) UnconstrainedStreamingCall(stream testpb.BenchmarkService_UnconstrainedStreamingCallServer) error { 96 in := new(testpb.SimpleRequest) 97 // Receive a message to learn response type and size. 98 err := stream.RecvMsg(in) 99 if err == io.EOF { 100 // read done. 101 return nil 102 } 103 if err != nil { 104 return err 105 } 106 107 response := &testpb.SimpleResponse{ 108 Payload: new(testpb.Payload), 109 } 110 setPayload(response.Payload, in.ResponseType, int(in.ResponseSize)) 111 112 go func() { 113 for { 114 // Using RecvMsg rather than Recv to prevent reallocation of SimpleRequest. 115 err := stream.RecvMsg(in) 116 switch status.Code(err) { 117 case codes.Canceled: 118 case codes.OK: 119 default: 120 log.Fatalf("server recv error: %v", err) 121 } 122 } 123 }() 124 125 go func() { 126 for { 127 err := stream.Send(response) 128 switch status.Code(err) { 129 case codes.Unavailable: 130 case codes.OK: 131 default: 132 log.Fatalf("server send error: %v", err) 133 } 134 } 135 }() 136 137 <-stream.Context().Done() 138 return stream.Context().Err() 139} 140 141// byteBufServer is a gRPC server that sends and receives byte buffer. 142// The purpose is to benchmark the gRPC performance without protobuf serialization/deserialization overhead. 143type byteBufServer struct { 144 respSize int32 145} 146 147// UnaryCall is an empty function and is not used for benchmark. 148// If bytebuf UnaryCall benchmark is needed later, the function body needs to be updated. 149func (s *byteBufServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { 150 return &testpb.SimpleResponse{}, nil 151} 152 153func (s *byteBufServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallServer) error { 154 for { 155 var in []byte 156 err := stream.(grpc.ServerStream).RecvMsg(&in) 157 if err == io.EOF { 158 return nil 159 } 160 if err != nil { 161 return err 162 } 163 out := make([]byte, s.respSize) 164 if err := stream.(grpc.ServerStream).SendMsg(&out); err != nil { 165 return err 166 } 167 } 168} 169 170func (s *byteBufServer) UnconstrainedStreamingCall(stream testpb.BenchmarkService_UnconstrainedStreamingCallServer) error { 171 for { 172 var in []byte 173 err := stream.(grpc.ServerStream).RecvMsg(&in) 174 if err == io.EOF { 175 return nil 176 } 177 if err != nil { 178 return err 179 } 180 out := make([]byte, s.respSize) 181 if err := stream.(grpc.ServerStream).SendMsg(&out); err != nil { 182 return err 183 } 184 } 185} 186 187// ServerInfo contains the information to create a gRPC benchmark server. 188type ServerInfo struct { 189 // Type is the type of the server. 190 // It should be "protobuf" or "bytebuf". 191 Type string 192 193 // Metadata is an optional configuration. 194 // For "protobuf", it's ignored. 195 // For "bytebuf", it should be an int representing response size. 196 Metadata interface{} 197 198 // Listener is the network listener for the server to use 199 Listener net.Listener 200} 201 202// StartServer starts a gRPC server serving a benchmark service according to info. 203// It returns a function to stop the server. 204func StartServer(info ServerInfo, opts ...grpc.ServerOption) func() { 205 opts = append(opts, grpc.WriteBufferSize(128*1024)) 206 opts = append(opts, grpc.ReadBufferSize(128*1024)) 207 s := grpc.NewServer(opts...) 208 switch info.Type { 209 case "protobuf": 210 testpb.RegisterBenchmarkServiceServer(s, &testServer{}) 211 case "bytebuf": 212 respSize, ok := info.Metadata.(int32) 213 if !ok { 214 grpclog.Fatalf("failed to StartServer, invalid metadata: %v, for Type: %v", info.Metadata, info.Type) 215 } 216 testpb.RegisterBenchmarkServiceServer(s, &byteBufServer{respSize: respSize}) 217 default: 218 grpclog.Fatalf("failed to StartServer, unknown Type: %v", info.Type) 219 } 220 go s.Serve(info.Listener) 221 return func() { 222 s.Stop() 223 } 224} 225 226// DoUnaryCall performs an unary RPC with given stub and request and response sizes. 227func DoUnaryCall(tc testpb.BenchmarkServiceClient, reqSize, respSize int) error { 228 pl := NewPayload(testpb.PayloadType_COMPRESSABLE, reqSize) 229 req := &testpb.SimpleRequest{ 230 ResponseType: pl.Type, 231 ResponseSize: int32(respSize), 232 Payload: pl, 233 } 234 if _, err := tc.UnaryCall(context.Background(), req); err != nil { 235 return fmt.Errorf("/BenchmarkService/UnaryCall(_, _) = _, %v, want _, <nil>", err) 236 } 237 return nil 238} 239 240// DoStreamingRoundTrip performs a round trip for a single streaming rpc. 241func DoStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) error { 242 pl := NewPayload(testpb.PayloadType_COMPRESSABLE, reqSize) 243 req := &testpb.SimpleRequest{ 244 ResponseType: pl.Type, 245 ResponseSize: int32(respSize), 246 Payload: pl, 247 } 248 if err := stream.Send(req); err != nil { 249 return fmt.Errorf("/BenchmarkService/StreamingCall.Send(_) = %v, want <nil>", err) 250 } 251 if _, err := stream.Recv(); err != nil { 252 // EOF is a valid error here. 253 if err == io.EOF { 254 return nil 255 } 256 return fmt.Errorf("/BenchmarkService/StreamingCall.Recv(_) = %v, want <nil>", err) 257 } 258 return nil 259} 260 261// DoByteBufStreamingRoundTrip performs a round trip for a single streaming rpc, using a custom codec for byte buffer. 262func DoByteBufStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) error { 263 out := make([]byte, reqSize) 264 if err := stream.(grpc.ClientStream).SendMsg(&out); err != nil { 265 return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).SendMsg(_) = %v, want <nil>", err) 266 } 267 var in []byte 268 if err := stream.(grpc.ClientStream).RecvMsg(&in); err != nil { 269 // EOF is a valid error here. 270 if err == io.EOF { 271 return nil 272 } 273 return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).RecvMsg(_) = %v, want <nil>", err) 274 } 275 return nil 276} 277 278// NewClientConn creates a gRPC client connection to addr. 279func NewClientConn(addr string, opts ...grpc.DialOption) *grpc.ClientConn { 280 return NewClientConnWithContext(context.Background(), addr, opts...) 281} 282 283// NewClientConnWithContext creates a gRPC client connection to addr using ctx. 284func NewClientConnWithContext(ctx context.Context, addr string, opts ...grpc.DialOption) *grpc.ClientConn { 285 opts = append(opts, grpc.WithWriteBufferSize(128*1024)) 286 opts = append(opts, grpc.WithReadBufferSize(128*1024)) 287 conn, err := grpc.DialContext(ctx, addr, opts...) 288 if err != nil { 289 grpclog.Fatalf("NewClientConn(%q) failed to create a ClientConn %v", addr, err) 290 } 291 return conn 292} 293