1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19//go:generate protoc -I grpc_testing --go_out=plugins=grpc:grpc_testing grpc_testing/control.proto grpc_testing/messages.proto grpc_testing/payloads.proto grpc_testing/services.proto grpc_testing/stats.proto
20
21/*
22Package benchmark implements the building blocks to setup end-to-end gRPC benchmarks.
23*/
24package benchmark
25
26import (
27	"fmt"
28	"io"
29	"net"
30	"sync"
31	"testing"
32	"time"
33
34	"golang.org/x/net/context"
35	"google.golang.org/grpc"
36	testpb "google.golang.org/grpc/benchmark/grpc_testing"
37	"google.golang.org/grpc/benchmark/latency"
38	"google.golang.org/grpc/benchmark/stats"
39	"google.golang.org/grpc/grpclog"
40)
41
42// AddOne add 1 to the features slice
43func AddOne(features []int, featuresMaxPosition []int) {
44	for i := len(features) - 1; i >= 0; i-- {
45		features[i] = (features[i] + 1)
46		if features[i]/featuresMaxPosition[i] == 0 {
47			break
48		}
49		features[i] = features[i] % featuresMaxPosition[i]
50	}
51}
52
53// Allows reuse of the same testpb.Payload object.
54func setPayload(p *testpb.Payload, t testpb.PayloadType, size int) {
55	if size < 0 {
56		grpclog.Fatalf("Requested a response with invalid length %d", size)
57	}
58	body := make([]byte, size)
59	switch t {
60	case testpb.PayloadType_COMPRESSABLE:
61	case testpb.PayloadType_UNCOMPRESSABLE:
62		grpclog.Fatalf("PayloadType UNCOMPRESSABLE is not supported")
63	default:
64		grpclog.Fatalf("Unsupported payload type: %d", t)
65	}
66	p.Type = t
67	p.Body = body
68	return
69}
70
71func newPayload(t testpb.PayloadType, size int) *testpb.Payload {
72	p := new(testpb.Payload)
73	setPayload(p, t, size)
74	return p
75}
76
77type testServer struct {
78}
79
80func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
81	return &testpb.SimpleResponse{
82		Payload: newPayload(in.ResponseType, int(in.ResponseSize)),
83	}, nil
84}
85
86func (s *testServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallServer) error {
87	response := &testpb.SimpleResponse{
88		Payload: new(testpb.Payload),
89	}
90	in := new(testpb.SimpleRequest)
91	for {
92		// use ServerStream directly to reuse the same testpb.SimpleRequest object
93		err := stream.(grpc.ServerStream).RecvMsg(in)
94		if err == io.EOF {
95			// read done.
96			return nil
97		}
98		if err != nil {
99			return err
100		}
101		setPayload(response.Payload, in.ResponseType, int(in.ResponseSize))
102		if err := stream.Send(response); err != nil {
103			return err
104		}
105	}
106}
107
108// byteBufServer is a gRPC server that sends and receives byte buffer.
109// The purpose is to benchmark the gRPC performance without protobuf serialization/deserialization overhead.
110type byteBufServer struct {
111	respSize int32
112}
113
114// UnaryCall is an empty function and is not used for benchmark.
115// If bytebuf UnaryCall benchmark is needed later, the function body needs to be updated.
116func (s *byteBufServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
117	return &testpb.SimpleResponse{}, nil
118}
119
120func (s *byteBufServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallServer) error {
121	for {
122		var in []byte
123		err := stream.(grpc.ServerStream).RecvMsg(&in)
124		if err == io.EOF {
125			return nil
126		}
127		if err != nil {
128			return err
129		}
130		out := make([]byte, s.respSize)
131		if err := stream.(grpc.ServerStream).SendMsg(&out); err != nil {
132			return err
133		}
134	}
135}
136
137// ServerInfo contains the information to create a gRPC benchmark server.
138type ServerInfo struct {
139	// Addr is the address of the server.
140	Addr string
141
142	// Type is the type of the server.
143	// It should be "protobuf" or "bytebuf".
144	Type string
145
146	// Metadata is an optional configuration.
147	// For "protobuf", it's ignored.
148	// For "bytebuf", it should be an int representing response size.
149	Metadata interface{}
150
151	// Network can simulate latency
152	Network *latency.Network
153}
154
155// StartServer starts a gRPC server serving a benchmark service according to info.
156// It returns its listen address and a function to stop the server.
157func StartServer(info ServerInfo, opts ...grpc.ServerOption) (string, func()) {
158	lis, err := net.Listen("tcp", info.Addr)
159	if err != nil {
160		grpclog.Fatalf("Failed to listen: %v", err)
161	}
162	nw := info.Network
163	if nw != nil {
164		lis = nw.Listener(lis)
165	}
166	opts = append(opts, grpc.WriteBufferSize(128*1024))
167	opts = append(opts, grpc.ReadBufferSize(128*1024))
168	s := grpc.NewServer(opts...)
169	switch info.Type {
170	case "protobuf":
171		testpb.RegisterBenchmarkServiceServer(s, &testServer{})
172	case "bytebuf":
173		respSize, ok := info.Metadata.(int32)
174		if !ok {
175			grpclog.Fatalf("failed to StartServer, invalid metadata: %v, for Type: %v", info.Metadata, info.Type)
176		}
177		testpb.RegisterBenchmarkServiceServer(s, &byteBufServer{respSize: respSize})
178	default:
179		grpclog.Fatalf("failed to StartServer, unknown Type: %v", info.Type)
180	}
181	go s.Serve(lis)
182	return lis.Addr().String(), func() {
183		s.Stop()
184	}
185}
186
187// DoUnaryCall performs an unary RPC with given stub and request and response sizes.
188func DoUnaryCall(tc testpb.BenchmarkServiceClient, reqSize, respSize int) error {
189	pl := newPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
190	req := &testpb.SimpleRequest{
191		ResponseType: pl.Type,
192		ResponseSize: int32(respSize),
193		Payload:      pl,
194	}
195	if _, err := tc.UnaryCall(context.Background(), req); err != nil {
196		return fmt.Errorf("/BenchmarkService/UnaryCall(_, _) = _, %v, want _, <nil>", err)
197	}
198	return nil
199}
200
201// DoStreamingRoundTrip performs a round trip for a single streaming rpc.
202func DoStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) error {
203	pl := newPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
204	req := &testpb.SimpleRequest{
205		ResponseType: pl.Type,
206		ResponseSize: int32(respSize),
207		Payload:      pl,
208	}
209	if err := stream.Send(req); err != nil {
210		return fmt.Errorf("/BenchmarkService/StreamingCall.Send(_) = %v, want <nil>", err)
211	}
212	if _, err := stream.Recv(); err != nil {
213		// EOF is a valid error here.
214		if err == io.EOF {
215			return nil
216		}
217		return fmt.Errorf("/BenchmarkService/StreamingCall.Recv(_) = %v, want <nil>", err)
218	}
219	return nil
220}
221
222// DoByteBufStreamingRoundTrip performs a round trip for a single streaming rpc, using a custom codec for byte buffer.
223func DoByteBufStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) error {
224	out := make([]byte, reqSize)
225	if err := stream.(grpc.ClientStream).SendMsg(&out); err != nil {
226		return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).SendMsg(_) = %v, want <nil>", err)
227	}
228	var in []byte
229	if err := stream.(grpc.ClientStream).RecvMsg(&in); err != nil {
230		// EOF is a valid error here.
231		if err == io.EOF {
232			return nil
233		}
234		return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).RecvMsg(_) = %v, want <nil>", err)
235	}
236	return nil
237}
238
239// NewClientConn creates a gRPC client connection to addr.
240func NewClientConn(addr string, opts ...grpc.DialOption) *grpc.ClientConn {
241	opts = append(opts, grpc.WithWriteBufferSize(128*1024))
242	opts = append(opts, grpc.WithReadBufferSize(128*1024))
243	conn, err := grpc.Dial(addr, opts...)
244	if err != nil {
245		grpclog.Fatalf("NewClientConn(%q) failed to create a ClientConn %v", addr, err)
246	}
247	return conn
248}
249
250func runUnary(b *testing.B, benchFeatures stats.Features) {
251	s := stats.AddStats(b, 38)
252	nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu}
253	target, stopper := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf", Network: nw}, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1)))
254	defer stopper()
255	conn := NewClientConn(
256		target, grpc.WithInsecure(),
257		grpc.WithDialer(func(address string, timeout time.Duration) (net.Conn, error) {
258			return nw.TimeoutDialer(net.DialTimeout)("tcp", address, timeout)
259		}),
260	)
261	tc := testpb.NewBenchmarkServiceClient(conn)
262
263	// Warm up connection.
264	for i := 0; i < 10; i++ {
265		unaryCaller(tc, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
266	}
267	ch := make(chan int, benchFeatures.MaxConcurrentCalls*4)
268	var (
269		mu sync.Mutex
270		wg sync.WaitGroup
271	)
272	wg.Add(benchFeatures.MaxConcurrentCalls)
273
274	// Distribute the b.N calls over maxConcurrentCalls workers.
275	for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ {
276		go func() {
277			for range ch {
278				start := time.Now()
279				unaryCaller(tc, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
280				elapse := time.Since(start)
281				mu.Lock()
282				s.Add(elapse)
283				mu.Unlock()
284			}
285			wg.Done()
286		}()
287	}
288	b.ResetTimer()
289	for i := 0; i < b.N; i++ {
290		ch <- i
291	}
292	close(ch)
293	wg.Wait()
294	b.StopTimer()
295	conn.Close()
296}
297
298func runStream(b *testing.B, benchFeatures stats.Features) {
299	s := stats.AddStats(b, 38)
300	nw := &latency.Network{Kbps: benchFeatures.Kbps, Latency: benchFeatures.Latency, MTU: benchFeatures.Mtu}
301	target, stopper := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf", Network: nw}, grpc.MaxConcurrentStreams(uint32(benchFeatures.MaxConcurrentCalls+1)))
302	defer stopper()
303	conn := NewClientConn(
304		target, grpc.WithInsecure(),
305		grpc.WithDialer(func(address string, timeout time.Duration) (net.Conn, error) {
306			return nw.TimeoutDialer(net.DialTimeout)("tcp", address, timeout)
307		}),
308	)
309	tc := testpb.NewBenchmarkServiceClient(conn)
310
311	// Warm up connection.
312	stream, err := tc.StreamingCall(context.Background())
313	if err != nil {
314		b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
315	}
316	for i := 0; i < 10; i++ {
317		streamCaller(stream, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
318	}
319
320	ch := make(chan struct{}, benchFeatures.MaxConcurrentCalls*4)
321	var (
322		mu sync.Mutex
323		wg sync.WaitGroup
324	)
325	wg.Add(benchFeatures.MaxConcurrentCalls)
326
327	// Distribute the b.N calls over maxConcurrentCalls workers.
328	for i := 0; i < benchFeatures.MaxConcurrentCalls; i++ {
329		stream, err := tc.StreamingCall(context.Background())
330		if err != nil {
331			b.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
332		}
333		go func() {
334			for range ch {
335				start := time.Now()
336				streamCaller(stream, benchFeatures.ReqSizeBytes, benchFeatures.RespSizeBytes)
337				elapse := time.Since(start)
338				mu.Lock()
339				s.Add(elapse)
340				mu.Unlock()
341			}
342			wg.Done()
343		}()
344	}
345	b.ResetTimer()
346	for i := 0; i < b.N; i++ {
347		ch <- struct{}{}
348	}
349	close(ch)
350	wg.Wait()
351	b.StopTimer()
352	conn.Close()
353}
354func unaryCaller(client testpb.BenchmarkServiceClient, reqSize, respSize int) {
355	if err := DoUnaryCall(client, reqSize, respSize); err != nil {
356		grpclog.Fatalf("DoUnaryCall failed: %v", err)
357	}
358}
359
360func streamCaller(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) {
361	if err := DoStreamingRoundTrip(stream, reqSize, respSize); err != nil {
362		grpclog.Fatalf("DoStreamingRoundTrip failed: %v", err)
363	}
364}
365