1/* 2 * 3 * Copyright 2014 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19/* 20Package benchmark implements the building blocks to setup end-to-end gRPC benchmarks. 21*/ 22package benchmark 23 24import ( 25 "context" 26 "fmt" 27 "io" 28 "log" 29 "net" 30 31 "google.golang.org/grpc" 32 testpb "google.golang.org/grpc/benchmark/grpc_testing" 33 "google.golang.org/grpc/codes" 34 "google.golang.org/grpc/grpclog" 35 "google.golang.org/grpc/status" 36) 37 38// Allows reuse of the same testpb.Payload object. 39func setPayload(p *testpb.Payload, t testpb.PayloadType, size int) { 40 if size < 0 { 41 grpclog.Fatalf("Requested a response with invalid length %d", size) 42 } 43 body := make([]byte, size) 44 switch t { 45 case testpb.PayloadType_COMPRESSABLE: 46 case testpb.PayloadType_UNCOMPRESSABLE: 47 grpclog.Fatalf("PayloadType UNCOMPRESSABLE is not supported") 48 default: 49 grpclog.Fatalf("Unsupported payload type: %d", t) 50 } 51 p.Type = t 52 p.Body = body 53} 54 55// NewPayload creates a payload with the given type and size. 56func NewPayload(t testpb.PayloadType, size int) *testpb.Payload { 57 p := new(testpb.Payload) 58 setPayload(p, t, size) 59 return p 60} 61 62type testServer struct { 63 testpb.UnimplementedBenchmarkServiceServer 64} 65 66func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { 67 return &testpb.SimpleResponse{ 68 Payload: NewPayload(in.ResponseType, int(in.ResponseSize)), 69 }, nil 70} 71 72func (s *testServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallServer) error { 73 response := &testpb.SimpleResponse{ 74 Payload: new(testpb.Payload), 75 } 76 in := new(testpb.SimpleRequest) 77 for { 78 // use ServerStream directly to reuse the same testpb.SimpleRequest object 79 err := stream.(grpc.ServerStream).RecvMsg(in) 80 if err == io.EOF { 81 // read done. 82 return nil 83 } 84 if err != nil { 85 return err 86 } 87 setPayload(response.Payload, in.ResponseType, int(in.ResponseSize)) 88 if err := stream.Send(response); err != nil { 89 return err 90 } 91 } 92} 93 94func (s *testServer) UnconstrainedStreamingCall(stream testpb.BenchmarkService_UnconstrainedStreamingCallServer) error { 95 in := new(testpb.SimpleRequest) 96 // Receive a message to learn response type and size. 97 err := stream.RecvMsg(in) 98 if err == io.EOF { 99 // read done. 100 return nil 101 } 102 if err != nil { 103 return err 104 } 105 106 response := &testpb.SimpleResponse{ 107 Payload: new(testpb.Payload), 108 } 109 setPayload(response.Payload, in.ResponseType, int(in.ResponseSize)) 110 111 go func() { 112 for { 113 // Using RecvMsg rather than Recv to prevent reallocation of SimpleRequest. 114 err := stream.RecvMsg(in) 115 switch status.Code(err) { 116 case codes.Canceled: 117 return 118 case codes.OK: 119 default: 120 log.Fatalf("server recv error: %v", err) 121 } 122 } 123 }() 124 125 go func() { 126 for { 127 err := stream.Send(response) 128 switch status.Code(err) { 129 case codes.Unavailable: 130 return 131 case codes.OK: 132 default: 133 log.Fatalf("server send error: %v", err) 134 } 135 } 136 }() 137 138 <-stream.Context().Done() 139 return stream.Context().Err() 140} 141 142// byteBufServer is a gRPC server that sends and receives byte buffer. 143// The purpose is to benchmark the gRPC performance without protobuf serialization/deserialization overhead. 144type byteBufServer struct { 145 testpb.UnimplementedBenchmarkServiceServer 146 respSize int32 147} 148 149// UnaryCall is an empty function and is not used for benchmark. 150// If bytebuf UnaryCall benchmark is needed later, the function body needs to be updated. 151func (s *byteBufServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { 152 return &testpb.SimpleResponse{}, nil 153} 154 155func (s *byteBufServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallServer) error { 156 for { 157 var in []byte 158 err := stream.(grpc.ServerStream).RecvMsg(&in) 159 if err == io.EOF { 160 return nil 161 } 162 if err != nil { 163 return err 164 } 165 out := make([]byte, s.respSize) 166 if err := stream.(grpc.ServerStream).SendMsg(&out); err != nil { 167 return err 168 } 169 } 170} 171 172func (s *byteBufServer) UnconstrainedStreamingCall(stream testpb.BenchmarkService_UnconstrainedStreamingCallServer) error { 173 for { 174 var in []byte 175 err := stream.(grpc.ServerStream).RecvMsg(&in) 176 if err == io.EOF { 177 return nil 178 } 179 if err != nil { 180 return err 181 } 182 out := make([]byte, s.respSize) 183 if err := stream.(grpc.ServerStream).SendMsg(&out); err != nil { 184 return err 185 } 186 } 187} 188 189// ServerInfo contains the information to create a gRPC benchmark server. 190type ServerInfo struct { 191 // Type is the type of the server. 192 // It should be "protobuf" or "bytebuf". 193 Type string 194 195 // Metadata is an optional configuration. 196 // For "protobuf", it's ignored. 197 // For "bytebuf", it should be an int representing response size. 198 Metadata interface{} 199 200 // Listener is the network listener for the server to use 201 Listener net.Listener 202} 203 204// StartServer starts a gRPC server serving a benchmark service according to info. 205// It returns a function to stop the server. 206func StartServer(info ServerInfo, opts ...grpc.ServerOption) func() { 207 opts = append(opts, grpc.WriteBufferSize(128*1024)) 208 opts = append(opts, grpc.ReadBufferSize(128*1024)) 209 s := grpc.NewServer(opts...) 210 switch info.Type { 211 case "protobuf": 212 testpb.RegisterBenchmarkServiceServer(s, &testServer{}) 213 case "bytebuf": 214 respSize, ok := info.Metadata.(int32) 215 if !ok { 216 grpclog.Fatalf("failed to StartServer, invalid metadata: %v, for Type: %v", info.Metadata, info.Type) 217 } 218 testpb.RegisterBenchmarkServiceServer(s, &byteBufServer{respSize: respSize}) 219 default: 220 grpclog.Fatalf("failed to StartServer, unknown Type: %v", info.Type) 221 } 222 go s.Serve(info.Listener) 223 return func() { 224 s.Stop() 225 } 226} 227 228// DoUnaryCall performs an unary RPC with given stub and request and response sizes. 229func DoUnaryCall(tc testpb.BenchmarkServiceClient, reqSize, respSize int) error { 230 pl := NewPayload(testpb.PayloadType_COMPRESSABLE, reqSize) 231 req := &testpb.SimpleRequest{ 232 ResponseType: pl.Type, 233 ResponseSize: int32(respSize), 234 Payload: pl, 235 } 236 if _, err := tc.UnaryCall(context.Background(), req); err != nil { 237 return fmt.Errorf("/BenchmarkService/UnaryCall(_, _) = _, %v, want _, <nil>", err) 238 } 239 return nil 240} 241 242// DoStreamingRoundTrip performs a round trip for a single streaming rpc. 243func DoStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) error { 244 pl := NewPayload(testpb.PayloadType_COMPRESSABLE, reqSize) 245 req := &testpb.SimpleRequest{ 246 ResponseType: pl.Type, 247 ResponseSize: int32(respSize), 248 Payload: pl, 249 } 250 if err := stream.Send(req); err != nil { 251 return fmt.Errorf("/BenchmarkService/StreamingCall.Send(_) = %v, want <nil>", err) 252 } 253 if _, err := stream.Recv(); err != nil { 254 // EOF is a valid error here. 255 if err == io.EOF { 256 return nil 257 } 258 return fmt.Errorf("/BenchmarkService/StreamingCall.Recv(_) = %v, want <nil>", err) 259 } 260 return nil 261} 262 263// DoByteBufStreamingRoundTrip performs a round trip for a single streaming rpc, using a custom codec for byte buffer. 264func DoByteBufStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) error { 265 out := make([]byte, reqSize) 266 if err := stream.(grpc.ClientStream).SendMsg(&out); err != nil { 267 return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).SendMsg(_) = %v, want <nil>", err) 268 } 269 var in []byte 270 if err := stream.(grpc.ClientStream).RecvMsg(&in); err != nil { 271 // EOF is a valid error here. 272 if err == io.EOF { 273 return nil 274 } 275 return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).RecvMsg(_) = %v, want <nil>", err) 276 } 277 return nil 278} 279 280// NewClientConn creates a gRPC client connection to addr. 281func NewClientConn(addr string, opts ...grpc.DialOption) *grpc.ClientConn { 282 return NewClientConnWithContext(context.Background(), addr, opts...) 283} 284 285// NewClientConnWithContext creates a gRPC client connection to addr using ctx. 286func NewClientConnWithContext(ctx context.Context, addr string, opts ...grpc.DialOption) *grpc.ClientConn { 287 opts = append(opts, grpc.WithWriteBufferSize(128*1024)) 288 opts = append(opts, grpc.WithReadBufferSize(128*1024)) 289 conn, err := grpc.DialContext(ctx, addr, opts...) 290 if err != nil { 291 grpclog.Fatalf("NewClientConn(%q) failed to create a ClientConn %v", addr, err) 292 } 293 return conn 294} 295