1/*
2 *
3 * Copyright 2014 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19/*
20Package benchmark implements the building blocks to setup end-to-end gRPC benchmarks.
21*/
22package benchmark
23
24import (
25	"context"
26	"fmt"
27	"io"
28	"log"
29	"net"
30
31	"google.golang.org/grpc"
32	"google.golang.org/grpc/codes"
33	"google.golang.org/grpc/grpclog"
34	"google.golang.org/grpc/metadata"
35	"google.golang.org/grpc/status"
36
37	testgrpc "google.golang.org/grpc/interop/grpc_testing"
38	testpb "google.golang.org/grpc/interop/grpc_testing"
39)
40
41var logger = grpclog.Component("benchmark")
42
43// Allows reuse of the same testpb.Payload object.
44func setPayload(p *testpb.Payload, t testpb.PayloadType, size int) {
45	if size < 0 {
46		logger.Fatalf("Requested a response with invalid length %d", size)
47	}
48	body := make([]byte, size)
49	switch t {
50	case testpb.PayloadType_COMPRESSABLE:
51	default:
52		logger.Fatalf("Unsupported payload type: %d", t)
53	}
54	p.Type = t
55	p.Body = body
56}
57
58// NewPayload creates a payload with the given type and size.
59func NewPayload(t testpb.PayloadType, size int) *testpb.Payload {
60	p := new(testpb.Payload)
61	setPayload(p, t, size)
62	return p
63}
64
65type testServer struct {
66	testgrpc.UnimplementedBenchmarkServiceServer
67}
68
69func (s *testServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
70	return &testpb.SimpleResponse{
71		Payload: NewPayload(in.ResponseType, int(in.ResponseSize)),
72	}, nil
73}
74
75// UnconstrainedStreamingHeader indicates to the StreamingCall handler that its
76// behavior should be unconstrained (constant send/receive in parallel) instead
77// of ping-pong.
78const UnconstrainedStreamingHeader = "unconstrained-streaming"
79
80func (s *testServer) StreamingCall(stream testgrpc.BenchmarkService_StreamingCallServer) error {
81	if md, ok := metadata.FromIncomingContext(stream.Context()); ok && len(md[UnconstrainedStreamingHeader]) != 0 {
82		return s.UnconstrainedStreamingCall(stream)
83	}
84	response := &testpb.SimpleResponse{
85		Payload: new(testpb.Payload),
86	}
87	in := new(testpb.SimpleRequest)
88	for {
89		// use ServerStream directly to reuse the same testpb.SimpleRequest object
90		err := stream.(grpc.ServerStream).RecvMsg(in)
91		if err == io.EOF {
92			// read done.
93			return nil
94		}
95		if err != nil {
96			return err
97		}
98		setPayload(response.Payload, in.ResponseType, int(in.ResponseSize))
99		if err := stream.Send(response); err != nil {
100			return err
101		}
102	}
103}
104
105func (s *testServer) UnconstrainedStreamingCall(stream testgrpc.BenchmarkService_StreamingCallServer) error {
106	in := new(testpb.SimpleRequest)
107	// Receive a message to learn response type and size.
108	err := stream.RecvMsg(in)
109	if err == io.EOF {
110		// read done.
111		return nil
112	}
113	if err != nil {
114		return err
115	}
116
117	response := &testpb.SimpleResponse{
118		Payload: new(testpb.Payload),
119	}
120	setPayload(response.Payload, in.ResponseType, int(in.ResponseSize))
121
122	go func() {
123		for {
124			// Using RecvMsg rather than Recv to prevent reallocation of SimpleRequest.
125			err := stream.RecvMsg(in)
126			switch status.Code(err) {
127			case codes.Canceled:
128				return
129			case codes.OK:
130			default:
131				log.Fatalf("server recv error: %v", err)
132			}
133		}
134	}()
135
136	go func() {
137		for {
138			err := stream.Send(response)
139			switch status.Code(err) {
140			case codes.Unavailable:
141				return
142			case codes.OK:
143			default:
144				log.Fatalf("server send error: %v", err)
145			}
146		}
147	}()
148
149	<-stream.Context().Done()
150	return stream.Context().Err()
151}
152
153// byteBufServer is a gRPC server that sends and receives byte buffer.
154// The purpose is to benchmark the gRPC performance without protobuf serialization/deserialization overhead.
155type byteBufServer struct {
156	testgrpc.UnimplementedBenchmarkServiceServer
157	respSize int32
158}
159
160// UnaryCall is an empty function and is not used for benchmark.
161// If bytebuf UnaryCall benchmark is needed later, the function body needs to be updated.
162func (s *byteBufServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
163	return &testpb.SimpleResponse{}, nil
164}
165
166func (s *byteBufServer) StreamingCall(stream testgrpc.BenchmarkService_StreamingCallServer) error {
167	for {
168		var in []byte
169		err := stream.(grpc.ServerStream).RecvMsg(&in)
170		if err == io.EOF {
171			return nil
172		}
173		if err != nil {
174			return err
175		}
176		out := make([]byte, s.respSize)
177		if err := stream.(grpc.ServerStream).SendMsg(&out); err != nil {
178			return err
179		}
180	}
181}
182
183// ServerInfo contains the information to create a gRPC benchmark server.
184type ServerInfo struct {
185	// Type is the type of the server.
186	// It should be "protobuf" or "bytebuf".
187	Type string
188
189	// Metadata is an optional configuration.
190	// For "protobuf", it's ignored.
191	// For "bytebuf", it should be an int representing response size.
192	Metadata interface{}
193
194	// Listener is the network listener for the server to use
195	Listener net.Listener
196}
197
198// StartServer starts a gRPC server serving a benchmark service according to info.
199// It returns a function to stop the server.
200func StartServer(info ServerInfo, opts ...grpc.ServerOption) func() {
201	opts = append(opts, grpc.WriteBufferSize(128*1024))
202	opts = append(opts, grpc.ReadBufferSize(128*1024))
203	s := grpc.NewServer(opts...)
204	switch info.Type {
205	case "protobuf":
206		testgrpc.RegisterBenchmarkServiceServer(s, &testServer{})
207	case "bytebuf":
208		respSize, ok := info.Metadata.(int32)
209		if !ok {
210			logger.Fatalf("failed to StartServer, invalid metadata: %v, for Type: %v", info.Metadata, info.Type)
211		}
212		testgrpc.RegisterBenchmarkServiceServer(s, &byteBufServer{respSize: respSize})
213	default:
214		logger.Fatalf("failed to StartServer, unknown Type: %v", info.Type)
215	}
216	go s.Serve(info.Listener)
217	return func() {
218		s.Stop()
219	}
220}
221
222// DoUnaryCall performs an unary RPC with given stub and request and response sizes.
223func DoUnaryCall(tc testgrpc.BenchmarkServiceClient, reqSize, respSize int) error {
224	pl := NewPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
225	req := &testpb.SimpleRequest{
226		ResponseType: pl.Type,
227		ResponseSize: int32(respSize),
228		Payload:      pl,
229	}
230	if _, err := tc.UnaryCall(context.Background(), req); err != nil {
231		return fmt.Errorf("/BenchmarkService/UnaryCall(_, _) = _, %v, want _, <nil>", err)
232	}
233	return nil
234}
235
236// DoStreamingRoundTrip performs a round trip for a single streaming rpc.
237func DoStreamingRoundTrip(stream testgrpc.BenchmarkService_StreamingCallClient, reqSize, respSize int) error {
238	pl := NewPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
239	req := &testpb.SimpleRequest{
240		ResponseType: pl.Type,
241		ResponseSize: int32(respSize),
242		Payload:      pl,
243	}
244	if err := stream.Send(req); err != nil {
245		return fmt.Errorf("/BenchmarkService/StreamingCall.Send(_) = %v, want <nil>", err)
246	}
247	if _, err := stream.Recv(); err != nil {
248		// EOF is a valid error here.
249		if err == io.EOF {
250			return nil
251		}
252		return fmt.Errorf("/BenchmarkService/StreamingCall.Recv(_) = %v, want <nil>", err)
253	}
254	return nil
255}
256
257// DoByteBufStreamingRoundTrip performs a round trip for a single streaming rpc, using a custom codec for byte buffer.
258func DoByteBufStreamingRoundTrip(stream testgrpc.BenchmarkService_StreamingCallClient, reqSize, respSize int) error {
259	out := make([]byte, reqSize)
260	if err := stream.(grpc.ClientStream).SendMsg(&out); err != nil {
261		return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).SendMsg(_) = %v, want <nil>", err)
262	}
263	var in []byte
264	if err := stream.(grpc.ClientStream).RecvMsg(&in); err != nil {
265		// EOF is a valid error here.
266		if err == io.EOF {
267			return nil
268		}
269		return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).RecvMsg(_) = %v, want <nil>", err)
270	}
271	return nil
272}
273
274// NewClientConn creates a gRPC client connection to addr.
275func NewClientConn(addr string, opts ...grpc.DialOption) *grpc.ClientConn {
276	return NewClientConnWithContext(context.Background(), addr, opts...)
277}
278
279// NewClientConnWithContext creates a gRPC client connection to addr using ctx.
280func NewClientConnWithContext(ctx context.Context, addr string, opts ...grpc.DialOption) *grpc.ClientConn {
281	opts = append(opts, grpc.WithWriteBufferSize(128*1024))
282	opts = append(opts, grpc.WithReadBufferSize(128*1024))
283	conn, err := grpc.DialContext(ctx, addr, opts...)
284	if err != nil {
285		logger.Fatalf("NewClientConn(%q) failed to create a ClientConn %v", addr, err)
286	}
287	return conn
288}
289