1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19//go:generate protoc -I grpc_testing --go_out=plugins=grpc:grpc_testing grpc_testing/control.proto grpc_testing/messages.proto grpc_testing/payloads.proto grpc_testing/services.proto grpc_testing/stats.proto
20
21/*
22Package benchmark implements the building blocks to setup end-to-end gRPC benchmarks.
23*/
24package benchmark
25
26import (
27	"context"
28	"fmt"
29	"io"
30	"log"
31	"net"
32
33	"google.golang.org/grpc"
34	testpb "google.golang.org/grpc/benchmark/grpc_testing"
35	"google.golang.org/grpc/codes"
36	"google.golang.org/grpc/grpclog"
37	"google.golang.org/grpc/status"
38)
39
40// Allows reuse of the same testpb.Payload object.
41func setPayload(p *testpb.Payload, t testpb.PayloadType, size int) {
42	if size < 0 {
43		grpclog.Fatalf("Requested a response with invalid length %d", size)
44	}
45	body := make([]byte, size)
46	switch t {
47	case testpb.PayloadType_COMPRESSABLE:
48	case testpb.PayloadType_UNCOMPRESSABLE:
49		grpclog.Fatalf("PayloadType UNCOMPRESSABLE is not supported")
50	default:
51		grpclog.Fatalf("Unsupported payload type: %d", t)
52	}
53	p.Type = t
54	p.Body = body
55}
56
57// NewPayload creates a payload with the given type and size.
58func NewPayload(t testpb.PayloadType, size int) *testpb.Payload {
59	p := new(testpb.Payload)
60	setPayload(p, t, size)
61	return p
62}
63
64type testServer struct {
65}
66
67func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
68	return &testpb.SimpleResponse{
69		Payload: NewPayload(in.ResponseType, int(in.ResponseSize)),
70	}, nil
71}
72
73func (s *testServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallServer) error {
74	response := &testpb.SimpleResponse{
75		Payload: new(testpb.Payload),
76	}
77	in := new(testpb.SimpleRequest)
78	for {
79		// use ServerStream directly to reuse the same testpb.SimpleRequest object
80		err := stream.(grpc.ServerStream).RecvMsg(in)
81		if err == io.EOF {
82			// read done.
83			return nil
84		}
85		if err != nil {
86			return err
87		}
88		setPayload(response.Payload, in.ResponseType, int(in.ResponseSize))
89		if err := stream.Send(response); err != nil {
90			return err
91		}
92	}
93}
94
95func (s *testServer) UnconstrainedStreamingCall(stream testpb.BenchmarkService_UnconstrainedStreamingCallServer) error {
96	in := new(testpb.SimpleRequest)
97	// Receive a message to learn response type and size.
98	err := stream.RecvMsg(in)
99	if err == io.EOF {
100		// read done.
101		return nil
102	}
103	if err != nil {
104		return err
105	}
106
107	response := &testpb.SimpleResponse{
108		Payload: new(testpb.Payload),
109	}
110	setPayload(response.Payload, in.ResponseType, int(in.ResponseSize))
111
112	go func() {
113		for {
114			// Using RecvMsg rather than Recv to prevent reallocation of SimpleRequest.
115			err := stream.RecvMsg(in)
116			switch status.Code(err) {
117			case codes.Canceled:
118			case codes.OK:
119			default:
120				log.Fatalf("server recv error: %v", err)
121			}
122		}
123	}()
124
125	go func() {
126		for {
127			err := stream.Send(response)
128			switch status.Code(err) {
129			case codes.Unavailable:
130			case codes.OK:
131			default:
132				log.Fatalf("server send error: %v", err)
133			}
134		}
135	}()
136
137	<-stream.Context().Done()
138	return stream.Context().Err()
139}
140
141// byteBufServer is a gRPC server that sends and receives byte buffer.
142// The purpose is to benchmark the gRPC performance without protobuf serialization/deserialization overhead.
143type byteBufServer struct {
144	respSize int32
145}
146
147// UnaryCall is an empty function and is not used for benchmark.
148// If bytebuf UnaryCall benchmark is needed later, the function body needs to be updated.
149func (s *byteBufServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
150	return &testpb.SimpleResponse{}, nil
151}
152
153func (s *byteBufServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallServer) error {
154	for {
155		var in []byte
156		err := stream.(grpc.ServerStream).RecvMsg(&in)
157		if err == io.EOF {
158			return nil
159		}
160		if err != nil {
161			return err
162		}
163		out := make([]byte, s.respSize)
164		if err := stream.(grpc.ServerStream).SendMsg(&out); err != nil {
165			return err
166		}
167	}
168}
169
170func (s *byteBufServer) UnconstrainedStreamingCall(stream testpb.BenchmarkService_UnconstrainedStreamingCallServer) error {
171	for {
172		var in []byte
173		err := stream.(grpc.ServerStream).RecvMsg(&in)
174		if err == io.EOF {
175			return nil
176		}
177		if err != nil {
178			return err
179		}
180		out := make([]byte, s.respSize)
181		if err := stream.(grpc.ServerStream).SendMsg(&out); err != nil {
182			return err
183		}
184	}
185}
186
187// ServerInfo contains the information to create a gRPC benchmark server.
188type ServerInfo struct {
189	// Type is the type of the server.
190	// It should be "protobuf" or "bytebuf".
191	Type string
192
193	// Metadata is an optional configuration.
194	// For "protobuf", it's ignored.
195	// For "bytebuf", it should be an int representing response size.
196	Metadata interface{}
197
198	// Listener is the network listener for the server to use
199	Listener net.Listener
200}
201
202// StartServer starts a gRPC server serving a benchmark service according to info.
203// It returns a function to stop the server.
204func StartServer(info ServerInfo, opts ...grpc.ServerOption) func() {
205	opts = append(opts, grpc.WriteBufferSize(128*1024))
206	opts = append(opts, grpc.ReadBufferSize(128*1024))
207	s := grpc.NewServer(opts...)
208	switch info.Type {
209	case "protobuf":
210		testpb.RegisterBenchmarkServiceServer(s, &testServer{})
211	case "bytebuf":
212		respSize, ok := info.Metadata.(int32)
213		if !ok {
214			grpclog.Fatalf("failed to StartServer, invalid metadata: %v, for Type: %v", info.Metadata, info.Type)
215		}
216		testpb.RegisterBenchmarkServiceServer(s, &byteBufServer{respSize: respSize})
217	default:
218		grpclog.Fatalf("failed to StartServer, unknown Type: %v", info.Type)
219	}
220	go s.Serve(info.Listener)
221	return func() {
222		s.Stop()
223	}
224}
225
226// DoUnaryCall performs an unary RPC with given stub and request and response sizes.
227func DoUnaryCall(tc testpb.BenchmarkServiceClient, reqSize, respSize int) error {
228	pl := NewPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
229	req := &testpb.SimpleRequest{
230		ResponseType: pl.Type,
231		ResponseSize: int32(respSize),
232		Payload:      pl,
233	}
234	if _, err := tc.UnaryCall(context.Background(), req); err != nil {
235		return fmt.Errorf("/BenchmarkService/UnaryCall(_, _) = _, %v, want _, <nil>", err)
236	}
237	return nil
238}
239
240// DoStreamingRoundTrip performs a round trip for a single streaming rpc.
241func DoStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) error {
242	pl := NewPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
243	req := &testpb.SimpleRequest{
244		ResponseType: pl.Type,
245		ResponseSize: int32(respSize),
246		Payload:      pl,
247	}
248	if err := stream.Send(req); err != nil {
249		return fmt.Errorf("/BenchmarkService/StreamingCall.Send(_) = %v, want <nil>", err)
250	}
251	if _, err := stream.Recv(); err != nil {
252		// EOF is a valid error here.
253		if err == io.EOF {
254			return nil
255		}
256		return fmt.Errorf("/BenchmarkService/StreamingCall.Recv(_) = %v, want <nil>", err)
257	}
258	return nil
259}
260
261// DoByteBufStreamingRoundTrip performs a round trip for a single streaming rpc, using a custom codec for byte buffer.
262func DoByteBufStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) error {
263	out := make([]byte, reqSize)
264	if err := stream.(grpc.ClientStream).SendMsg(&out); err != nil {
265		return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).SendMsg(_) = %v, want <nil>", err)
266	}
267	var in []byte
268	if err := stream.(grpc.ClientStream).RecvMsg(&in); err != nil {
269		// EOF is a valid error here.
270		if err == io.EOF {
271			return nil
272		}
273		return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).RecvMsg(_) = %v, want <nil>", err)
274	}
275	return nil
276}
277
278// NewClientConn creates a gRPC client connection to addr.
279func NewClientConn(addr string, opts ...grpc.DialOption) *grpc.ClientConn {
280	return NewClientConnWithContext(context.Background(), addr, opts...)
281}
282
283// NewClientConnWithContext creates a gRPC client connection to addr using ctx.
284func NewClientConnWithContext(ctx context.Context, addr string, opts ...grpc.DialOption) *grpc.ClientConn {
285	opts = append(opts, grpc.WithWriteBufferSize(128*1024))
286	opts = append(opts, grpc.WithReadBufferSize(128*1024))
287	conn, err := grpc.DialContext(ctx, addr, opts...)
288	if err != nil {
289		grpclog.Fatalf("NewClientConn(%q) failed to create a ClientConn %v", addr, err)
290	}
291	return conn
292}
293