1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19/*
20Package benchmark implements the building blocks to setup end-to-end gRPC benchmarks.
21*/
22package benchmark
23
24import (
25	"context"
26	"fmt"
27	"io"
28	"log"
29	"net"
30
31	"google.golang.org/grpc"
32	testpb "google.golang.org/grpc/benchmark/grpc_testing"
33	"google.golang.org/grpc/codes"
34	"google.golang.org/grpc/grpclog"
35	"google.golang.org/grpc/status"
36)
37
38// Allows reuse of the same testpb.Payload object.
39func setPayload(p *testpb.Payload, t testpb.PayloadType, size int) {
40	if size < 0 {
41		grpclog.Fatalf("Requested a response with invalid length %d", size)
42	}
43	body := make([]byte, size)
44	switch t {
45	case testpb.PayloadType_COMPRESSABLE:
46	case testpb.PayloadType_UNCOMPRESSABLE:
47		grpclog.Fatalf("PayloadType UNCOMPRESSABLE is not supported")
48	default:
49		grpclog.Fatalf("Unsupported payload type: %d", t)
50	}
51	p.Type = t
52	p.Body = body
53}
54
55// NewPayload creates a payload with the given type and size.
56func NewPayload(t testpb.PayloadType, size int) *testpb.Payload {
57	p := new(testpb.Payload)
58	setPayload(p, t, size)
59	return p
60}
61
62type testServer struct {
63	testpb.UnimplementedBenchmarkServiceServer
64}
65
66func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
67	return &testpb.SimpleResponse{
68		Payload: NewPayload(in.ResponseType, int(in.ResponseSize)),
69	}, nil
70}
71
72func (s *testServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallServer) error {
73	response := &testpb.SimpleResponse{
74		Payload: new(testpb.Payload),
75	}
76	in := new(testpb.SimpleRequest)
77	for {
78		// use ServerStream directly to reuse the same testpb.SimpleRequest object
79		err := stream.(grpc.ServerStream).RecvMsg(in)
80		if err == io.EOF {
81			// read done.
82			return nil
83		}
84		if err != nil {
85			return err
86		}
87		setPayload(response.Payload, in.ResponseType, int(in.ResponseSize))
88		if err := stream.Send(response); err != nil {
89			return err
90		}
91	}
92}
93
94func (s *testServer) UnconstrainedStreamingCall(stream testpb.BenchmarkService_UnconstrainedStreamingCallServer) error {
95	in := new(testpb.SimpleRequest)
96	// Receive a message to learn response type and size.
97	err := stream.RecvMsg(in)
98	if err == io.EOF {
99		// read done.
100		return nil
101	}
102	if err != nil {
103		return err
104	}
105
106	response := &testpb.SimpleResponse{
107		Payload: new(testpb.Payload),
108	}
109	setPayload(response.Payload, in.ResponseType, int(in.ResponseSize))
110
111	go func() {
112		for {
113			// Using RecvMsg rather than Recv to prevent reallocation of SimpleRequest.
114			err := stream.RecvMsg(in)
115			switch status.Code(err) {
116			case codes.Canceled:
117				return
118			case codes.OK:
119			default:
120				log.Fatalf("server recv error: %v", err)
121			}
122		}
123	}()
124
125	go func() {
126		for {
127			err := stream.Send(response)
128			switch status.Code(err) {
129			case codes.Unavailable:
130				return
131			case codes.OK:
132			default:
133				log.Fatalf("server send error: %v", err)
134			}
135		}
136	}()
137
138	<-stream.Context().Done()
139	return stream.Context().Err()
140}
141
142// byteBufServer is a gRPC server that sends and receives byte buffer.
143// The purpose is to benchmark the gRPC performance without protobuf serialization/deserialization overhead.
144type byteBufServer struct {
145	testpb.UnimplementedBenchmarkServiceServer
146	respSize int32
147}
148
149// UnaryCall is an empty function and is not used for benchmark.
150// If bytebuf UnaryCall benchmark is needed later, the function body needs to be updated.
151func (s *byteBufServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
152	return &testpb.SimpleResponse{}, nil
153}
154
155func (s *byteBufServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallServer) error {
156	for {
157		var in []byte
158		err := stream.(grpc.ServerStream).RecvMsg(&in)
159		if err == io.EOF {
160			return nil
161		}
162		if err != nil {
163			return err
164		}
165		out := make([]byte, s.respSize)
166		if err := stream.(grpc.ServerStream).SendMsg(&out); err != nil {
167			return err
168		}
169	}
170}
171
172func (s *byteBufServer) UnconstrainedStreamingCall(stream testpb.BenchmarkService_UnconstrainedStreamingCallServer) error {
173	for {
174		var in []byte
175		err := stream.(grpc.ServerStream).RecvMsg(&in)
176		if err == io.EOF {
177			return nil
178		}
179		if err != nil {
180			return err
181		}
182		out := make([]byte, s.respSize)
183		if err := stream.(grpc.ServerStream).SendMsg(&out); err != nil {
184			return err
185		}
186	}
187}
188
189// ServerInfo contains the information to create a gRPC benchmark server.
190type ServerInfo struct {
191	// Type is the type of the server.
192	// It should be "protobuf" or "bytebuf".
193	Type string
194
195	// Metadata is an optional configuration.
196	// For "protobuf", it's ignored.
197	// For "bytebuf", it should be an int representing response size.
198	Metadata interface{}
199
200	// Listener is the network listener for the server to use
201	Listener net.Listener
202}
203
204// StartServer starts a gRPC server serving a benchmark service according to info.
205// It returns a function to stop the server.
206func StartServer(info ServerInfo, opts ...grpc.ServerOption) func() {
207	opts = append(opts, grpc.WriteBufferSize(128*1024))
208	opts = append(opts, grpc.ReadBufferSize(128*1024))
209	s := grpc.NewServer(opts...)
210	switch info.Type {
211	case "protobuf":
212		testpb.RegisterBenchmarkServiceServer(s, &testServer{})
213	case "bytebuf":
214		respSize, ok := info.Metadata.(int32)
215		if !ok {
216			grpclog.Fatalf("failed to StartServer, invalid metadata: %v, for Type: %v", info.Metadata, info.Type)
217		}
218		testpb.RegisterBenchmarkServiceServer(s, &byteBufServer{respSize: respSize})
219	default:
220		grpclog.Fatalf("failed to StartServer, unknown Type: %v", info.Type)
221	}
222	go s.Serve(info.Listener)
223	return func() {
224		s.Stop()
225	}
226}
227
228// DoUnaryCall performs an unary RPC with given stub and request and response sizes.
229func DoUnaryCall(tc testpb.BenchmarkServiceClient, reqSize, respSize int) error {
230	pl := NewPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
231	req := &testpb.SimpleRequest{
232		ResponseType: pl.Type,
233		ResponseSize: int32(respSize),
234		Payload:      pl,
235	}
236	if _, err := tc.UnaryCall(context.Background(), req); err != nil {
237		return fmt.Errorf("/BenchmarkService/UnaryCall(_, _) = _, %v, want _, <nil>", err)
238	}
239	return nil
240}
241
242// DoStreamingRoundTrip performs a round trip for a single streaming rpc.
243func DoStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) error {
244	pl := NewPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
245	req := &testpb.SimpleRequest{
246		ResponseType: pl.Type,
247		ResponseSize: int32(respSize),
248		Payload:      pl,
249	}
250	if err := stream.Send(req); err != nil {
251		return fmt.Errorf("/BenchmarkService/StreamingCall.Send(_) = %v, want <nil>", err)
252	}
253	if _, err := stream.Recv(); err != nil {
254		// EOF is a valid error here.
255		if err == io.EOF {
256			return nil
257		}
258		return fmt.Errorf("/BenchmarkService/StreamingCall.Recv(_) = %v, want <nil>", err)
259	}
260	return nil
261}
262
263// DoByteBufStreamingRoundTrip performs a round trip for a single streaming rpc, using a custom codec for byte buffer.
264func DoByteBufStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) error {
265	out := make([]byte, reqSize)
266	if err := stream.(grpc.ClientStream).SendMsg(&out); err != nil {
267		return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).SendMsg(_) = %v, want <nil>", err)
268	}
269	var in []byte
270	if err := stream.(grpc.ClientStream).RecvMsg(&in); err != nil {
271		// EOF is a valid error here.
272		if err == io.EOF {
273			return nil
274		}
275		return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).RecvMsg(_) = %v, want <nil>", err)
276	}
277	return nil
278}
279
280// NewClientConn creates a gRPC client connection to addr.
281func NewClientConn(addr string, opts ...grpc.DialOption) *grpc.ClientConn {
282	return NewClientConnWithContext(context.Background(), addr, opts...)
283}
284
285// NewClientConnWithContext creates a gRPC client connection to addr using ctx.
286func NewClientConnWithContext(ctx context.Context, addr string, opts ...grpc.DialOption) *grpc.ClientConn {
287	opts = append(opts, grpc.WithWriteBufferSize(128*1024))
288	opts = append(opts, grpc.WithReadBufferSize(128*1024))
289	conn, err := grpc.DialContext(ctx, addr, opts...)
290	if err != nil {
291		grpclog.Fatalf("NewClientConn(%q) failed to create a ClientConn %v", addr, err)
292	}
293	return conn
294}
295