1/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19/*
20Package main provides benchmark with setting flags.
21
22An example to run some benchmarks with profiling enabled:
23
24go run benchmark/benchmain/main.go -benchtime=10s -workloads=all \
25  -compression=gzip -maxConcurrentCalls=1 -trace=off \
26  -reqSizeBytes=1,1048576 -respSizeBytes=1,1048576 -networkMode=Local \
27  -cpuProfile=cpuProf -memProfile=memProf -memProfileRate=10000 -resultFile=result
28
29As a suggestion, when creating a branch, you can run this benchmark and save the result
30file "-resultFile=basePerf", and later when you at the middle of the work or finish the
31work, you can get the benchmark result and compare it with the base anytime.
32
33Assume there are two result files names as "basePerf" and "curPerf" created by adding
34-resultFile=basePerf and -resultFile=curPerf.
35	To format the curPerf, run:
36  	go run benchmark/benchresult/main.go curPerf
37	To observe how the performance changes based on a base result, run:
38  	go run benchmark/benchresult/main.go basePerf curPerf
39*/
40package main
41
42import (
43	"context"
44	"encoding/gob"
45	"flag"
46	"fmt"
47	"io"
48	"io/ioutil"
49	"log"
50	"net"
51	"os"
52	"reflect"
53	"runtime"
54	"runtime/pprof"
55	"strings"
56	"sync"
57	"sync/atomic"
58	"time"
59
60	"google.golang.org/grpc"
61	"google.golang.org/grpc/benchmark"
62	bm "google.golang.org/grpc/benchmark"
63	"google.golang.org/grpc/benchmark/flags"
64	"google.golang.org/grpc/benchmark/latency"
65	"google.golang.org/grpc/benchmark/stats"
66	"google.golang.org/grpc/grpclog"
67	"google.golang.org/grpc/internal/channelz"
68	"google.golang.org/grpc/keepalive"
69	"google.golang.org/grpc/metadata"
70	"google.golang.org/grpc/test/bufconn"
71
72	testgrpc "google.golang.org/grpc/interop/grpc_testing"
73	testpb "google.golang.org/grpc/interop/grpc_testing"
74)
75
76var (
77	workloads = flags.StringWithAllowedValues("workloads", workloadsAll,
78		fmt.Sprintf("Workloads to execute - One of: %v", strings.Join(allWorkloads, ", ")), allWorkloads)
79	traceMode = flags.StringWithAllowedValues("trace", toggleModeOff,
80		fmt.Sprintf("Trace mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes)
81	preloaderMode = flags.StringWithAllowedValues("preloader", toggleModeOff,
82		fmt.Sprintf("Preloader mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes)
83	channelzOn = flags.StringWithAllowedValues("channelz", toggleModeOff,
84		fmt.Sprintf("Channelz mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes)
85	compressorMode = flags.StringWithAllowedValues("compression", compModeOff,
86		fmt.Sprintf("Compression mode - One of: %v", strings.Join(allCompModes, ", ")), allCompModes)
87	networkMode = flags.StringWithAllowedValues("networkMode", networkModeNone,
88		"Network mode includes LAN, WAN, Local and Longhaul", allNetworkModes)
89	readLatency           = flags.DurationSlice("latency", defaultReadLatency, "Simulated one-way network latency - may be a comma-separated list")
90	readKbps              = flags.IntSlice("kbps", defaultReadKbps, "Simulated network throughput (in kbps) - may be a comma-separated list")
91	readMTU               = flags.IntSlice("mtu", defaultReadMTU, "Simulated network MTU (Maximum Transmission Unit) - may be a comma-separated list")
92	maxConcurrentCalls    = flags.IntSlice("maxConcurrentCalls", defaultMaxConcurrentCalls, "Number of concurrent RPCs during benchmarks")
93	readReqSizeBytes      = flags.IntSlice("reqSizeBytes", nil, "Request size in bytes - may be a comma-separated list")
94	readRespSizeBytes     = flags.IntSlice("respSizeBytes", nil, "Response size in bytes - may be a comma-separated list")
95	reqPayloadCurveFiles  = flags.StringSlice("reqPayloadCurveFiles", nil, "comma-separated list of CSV files describing the shape a random distribution of request payload sizes")
96	respPayloadCurveFiles = flags.StringSlice("respPayloadCurveFiles", nil, "comma-separated list of CSV files describing the shape a random distribution of response payload sizes")
97	benchTime             = flag.Duration("benchtime", time.Second, "Configures the amount of time to run each benchmark")
98	memProfile            = flag.String("memProfile", "", "Enables memory profiling output to the filename provided.")
99	memProfileRate        = flag.Int("memProfileRate", 512*1024, "Configures the memory profiling rate. \n"+
100		"memProfile should be set before setting profile rate. To include every allocated block in the profile, "+
101		"set MemProfileRate to 1. To turn off profiling entirely, set MemProfileRate to 0. 512 * 1024 by default.")
102	cpuProfile          = flag.String("cpuProfile", "", "Enables CPU profiling output to the filename provided")
103	benchmarkResultFile = flag.String("resultFile", "", "Save the benchmark result into a binary file")
104	useBufconn          = flag.Bool("bufconn", false, "Use in-memory connection instead of system network I/O")
105	enableKeepalive     = flag.Bool("enable_keepalive", false, "Enable client keepalive. \n"+
106		"Keepalive.Time is set to 10s, Keepalive.Timeout is set to 1s, Keepalive.PermitWithoutStream is set to true.")
107
108	logger = grpclog.Component("benchmark")
109)
110
111const (
112	workloadsUnary         = "unary"
113	workloadsStreaming     = "streaming"
114	workloadsUnconstrained = "unconstrained"
115	workloadsAll           = "all"
116	// Compression modes.
117	compModeOff  = "off"
118	compModeGzip = "gzip"
119	compModeNop  = "nop"
120	compModeAll  = "all"
121	// Toggle modes.
122	toggleModeOff  = "off"
123	toggleModeOn   = "on"
124	toggleModeBoth = "both"
125	// Network modes.
126	networkModeNone  = "none"
127	networkModeLocal = "Local"
128	networkModeLAN   = "LAN"
129	networkModeWAN   = "WAN"
130	networkLongHaul  = "Longhaul"
131
132	numStatsBuckets = 10
133	warmupCallCount = 10
134	warmuptime      = time.Second
135)
136
137var (
138	allWorkloads              = []string{workloadsUnary, workloadsStreaming, workloadsUnconstrained, workloadsAll}
139	allCompModes              = []string{compModeOff, compModeGzip, compModeNop, compModeAll}
140	allToggleModes            = []string{toggleModeOff, toggleModeOn, toggleModeBoth}
141	allNetworkModes           = []string{networkModeNone, networkModeLocal, networkModeLAN, networkModeWAN, networkLongHaul}
142	defaultReadLatency        = []time.Duration{0, 40 * time.Millisecond} // if non-positive, no delay.
143	defaultReadKbps           = []int{0, 10240}                           // if non-positive, infinite
144	defaultReadMTU            = []int{0}                                  // if non-positive, infinite
145	defaultMaxConcurrentCalls = []int{1, 8, 64, 512}
146	defaultReqSizeBytes       = []int{1, 1024, 1024 * 1024}
147	defaultRespSizeBytes      = []int{1, 1024, 1024 * 1024}
148	networks                  = map[string]latency.Network{
149		networkModeLocal: latency.Local,
150		networkModeLAN:   latency.LAN,
151		networkModeWAN:   latency.WAN,
152		networkLongHaul:  latency.Longhaul,
153	}
154	keepaliveTime    = 10 * time.Second
155	keepaliveTimeout = 1 * time.Second
156	// This is 0.8*keepaliveTime to prevent connection issues because of server
157	// keepalive enforcement.
158	keepaliveMinTime = 8 * time.Second
159)
160
161// runModes indicates the workloads to run. This is initialized with a call to
162// `runModesFromWorkloads`, passing the workloads flag set by the user.
163type runModes struct {
164	unary, streaming, unconstrained bool
165}
166
167// runModesFromWorkloads determines the runModes based on the value of
168// workloads flag set by the user.
169func runModesFromWorkloads(workload string) runModes {
170	r := runModes{}
171	switch workload {
172	case workloadsUnary:
173		r.unary = true
174	case workloadsStreaming:
175		r.streaming = true
176	case workloadsUnconstrained:
177		r.unconstrained = true
178	case workloadsAll:
179		r.unary = true
180		r.streaming = true
181		r.unconstrained = true
182	default:
183		log.Fatalf("Unknown workloads setting: %v (want one of: %v)",
184			workloads, strings.Join(allWorkloads, ", "))
185	}
186	return r
187}
188
189type startFunc func(mode string, bf stats.Features)
190type stopFunc func(count uint64)
191type ucStopFunc func(req uint64, resp uint64)
192type rpcCallFunc func(pos int)
193type rpcSendFunc func(pos int)
194type rpcRecvFunc func(pos int)
195type rpcCleanupFunc func()
196
197func unaryBenchmark(start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats) {
198	caller, cleanup := makeFuncUnary(bf)
199	defer cleanup()
200	runBenchmark(caller, start, stop, bf, s, workloadsUnary)
201}
202
203func streamBenchmark(start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats) {
204	caller, cleanup := makeFuncStream(bf)
205	defer cleanup()
206	runBenchmark(caller, start, stop, bf, s, workloadsStreaming)
207}
208
209func unconstrainedStreamBenchmark(start startFunc, stop ucStopFunc, bf stats.Features, s *stats.Stats) {
210	var sender rpcSendFunc
211	var recver rpcRecvFunc
212	var cleanup rpcCleanupFunc
213	if bf.EnablePreloader {
214		sender, recver, cleanup = makeFuncUnconstrainedStreamPreloaded(bf)
215	} else {
216		sender, recver, cleanup = makeFuncUnconstrainedStream(bf)
217	}
218	defer cleanup()
219
220	var req, resp uint64
221	go func() {
222		// Resets the counters once warmed up
223		<-time.NewTimer(warmuptime).C
224		atomic.StoreUint64(&req, 0)
225		atomic.StoreUint64(&resp, 0)
226		start(workloadsUnconstrained, bf)
227	}()
228
229	bmEnd := time.Now().Add(bf.BenchTime + warmuptime)
230	var wg sync.WaitGroup
231	wg.Add(2 * bf.MaxConcurrentCalls)
232	for i := 0; i < bf.MaxConcurrentCalls; i++ {
233		go func(pos int) {
234			defer wg.Done()
235			for {
236				t := time.Now()
237				if t.After(bmEnd) {
238					return
239				}
240				sender(pos)
241				atomic.AddUint64(&req, 1)
242			}
243		}(i)
244		go func(pos int) {
245			defer wg.Done()
246			for {
247				t := time.Now()
248				if t.After(bmEnd) {
249					return
250				}
251				recver(pos)
252				atomic.AddUint64(&resp, 1)
253			}
254		}(i)
255	}
256	wg.Wait()
257	stop(req, resp)
258}
259
260// makeClient returns a gRPC client for the grpc.testing.BenchmarkService
261// service. The client is configured using the different options in the passed
262// 'bf'. Also returns a cleanup function to close the client and release
263// resources.
264func makeClient(bf stats.Features) (testgrpc.BenchmarkServiceClient, func()) {
265	nw := &latency.Network{Kbps: bf.Kbps, Latency: bf.Latency, MTU: bf.MTU}
266	opts := []grpc.DialOption{}
267	sopts := []grpc.ServerOption{}
268	if bf.ModeCompressor == compModeNop {
269		sopts = append(sopts,
270			grpc.RPCCompressor(nopCompressor{}),
271			grpc.RPCDecompressor(nopDecompressor{}),
272		)
273		opts = append(opts,
274			grpc.WithCompressor(nopCompressor{}),
275			grpc.WithDecompressor(nopDecompressor{}),
276		)
277	}
278	if bf.ModeCompressor == compModeGzip {
279		sopts = append(sopts,
280			grpc.RPCCompressor(grpc.NewGZIPCompressor()),
281			grpc.RPCDecompressor(grpc.NewGZIPDecompressor()),
282		)
283		opts = append(opts,
284			grpc.WithCompressor(grpc.NewGZIPCompressor()),
285			grpc.WithDecompressor(grpc.NewGZIPDecompressor()),
286		)
287	}
288	if bf.EnableKeepalive {
289		sopts = append(sopts,
290			grpc.KeepaliveParams(keepalive.ServerParameters{
291				Time:    keepaliveTime,
292				Timeout: keepaliveTimeout,
293			}),
294			grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
295				MinTime:             keepaliveMinTime,
296				PermitWithoutStream: true,
297			}),
298		)
299		opts = append(opts,
300			grpc.WithKeepaliveParams(keepalive.ClientParameters{
301				Time:                keepaliveTime,
302				Timeout:             keepaliveTimeout,
303				PermitWithoutStream: true,
304			}),
305		)
306	}
307	sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(bf.MaxConcurrentCalls+1)))
308	opts = append(opts, grpc.WithInsecure())
309
310	var lis net.Listener
311	if bf.UseBufConn {
312		bcLis := bufconn.Listen(256 * 1024)
313		lis = bcLis
314		opts = append(opts, grpc.WithContextDialer(func(ctx context.Context, address string) (net.Conn, error) {
315			return nw.ContextDialer(func(context.Context, string, string) (net.Conn, error) {
316				return bcLis.Dial()
317			})(ctx, "", "")
318		}))
319	} else {
320		var err error
321		lis, err = net.Listen("tcp", "localhost:0")
322		if err != nil {
323			logger.Fatalf("Failed to listen: %v", err)
324		}
325		opts = append(opts, grpc.WithContextDialer(func(ctx context.Context, address string) (net.Conn, error) {
326			return nw.ContextDialer((&net.Dialer{}).DialContext)(ctx, "tcp", lis.Addr().String())
327		}))
328	}
329	lis = nw.Listener(lis)
330	stopper := bm.StartServer(bm.ServerInfo{Type: "protobuf", Listener: lis}, sopts...)
331	conn := bm.NewClientConn("" /* target not used */, opts...)
332	return testgrpc.NewBenchmarkServiceClient(conn), func() {
333		conn.Close()
334		stopper()
335	}
336}
337
338func makeFuncUnary(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
339	tc, cleanup := makeClient(bf)
340	return func(int) {
341		reqSizeBytes := bf.ReqSizeBytes
342		respSizeBytes := bf.RespSizeBytes
343		if bf.ReqPayloadCurve != nil {
344			reqSizeBytes = bf.ReqPayloadCurve.ChooseRandom()
345		}
346		if bf.RespPayloadCurve != nil {
347			respSizeBytes = bf.RespPayloadCurve.ChooseRandom()
348		}
349		unaryCaller(tc, reqSizeBytes, respSizeBytes)
350	}, cleanup
351}
352
353func makeFuncStream(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
354	tc, cleanup := makeClient(bf)
355
356	streams := make([]testgrpc.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls)
357	for i := 0; i < bf.MaxConcurrentCalls; i++ {
358		stream, err := tc.StreamingCall(context.Background())
359		if err != nil {
360			logger.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
361		}
362		streams[i] = stream
363	}
364
365	return func(pos int) {
366		reqSizeBytes := bf.ReqSizeBytes
367		respSizeBytes := bf.RespSizeBytes
368		if bf.ReqPayloadCurve != nil {
369			reqSizeBytes = bf.ReqPayloadCurve.ChooseRandom()
370		}
371		if bf.RespPayloadCurve != nil {
372			respSizeBytes = bf.RespPayloadCurve.ChooseRandom()
373		}
374		streamCaller(streams[pos], reqSizeBytes, respSizeBytes)
375	}, cleanup
376}
377
378func makeFuncUnconstrainedStreamPreloaded(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) {
379	streams, req, cleanup := setupUnconstrainedStream(bf)
380
381	preparedMsg := make([]*grpc.PreparedMsg, len(streams))
382	for i, stream := range streams {
383		preparedMsg[i] = &grpc.PreparedMsg{}
384		err := preparedMsg[i].Encode(stream, req)
385		if err != nil {
386			logger.Fatalf("%v.Encode(%v, %v) = %v", preparedMsg[i], req, stream, err)
387		}
388	}
389
390	return func(pos int) {
391			streams[pos].SendMsg(preparedMsg[pos])
392		}, func(pos int) {
393			streams[pos].Recv()
394		}, cleanup
395}
396
397func makeFuncUnconstrainedStream(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) {
398	streams, req, cleanup := setupUnconstrainedStream(bf)
399
400	return func(pos int) {
401			streams[pos].Send(req)
402		}, func(pos int) {
403			streams[pos].Recv()
404		}, cleanup
405}
406
407func setupUnconstrainedStream(bf stats.Features) ([]testgrpc.BenchmarkService_StreamingCallClient, *testpb.SimpleRequest, rpcCleanupFunc) {
408	tc, cleanup := makeClient(bf)
409
410	streams := make([]testgrpc.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls)
411	md := metadata.Pairs(benchmark.UnconstrainedStreamingHeader, "1")
412	ctx := metadata.NewOutgoingContext(context.Background(), md)
413	for i := 0; i < bf.MaxConcurrentCalls; i++ {
414		stream, err := tc.StreamingCall(ctx)
415		if err != nil {
416			logger.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
417		}
418		streams[i] = stream
419	}
420
421	pl := bm.NewPayload(testpb.PayloadType_COMPRESSABLE, bf.ReqSizeBytes)
422	req := &testpb.SimpleRequest{
423		ResponseType: pl.Type,
424		ResponseSize: int32(bf.RespSizeBytes),
425		Payload:      pl,
426	}
427
428	return streams, req, cleanup
429}
430
431// Makes a UnaryCall gRPC request using the given BenchmarkServiceClient and
432// request and response sizes.
433func unaryCaller(client testgrpc.BenchmarkServiceClient, reqSize, respSize int) {
434	if err := bm.DoUnaryCall(client, reqSize, respSize); err != nil {
435		logger.Fatalf("DoUnaryCall failed: %v", err)
436	}
437}
438
439func streamCaller(stream testgrpc.BenchmarkService_StreamingCallClient, reqSize, respSize int) {
440	if err := bm.DoStreamingRoundTrip(stream, reqSize, respSize); err != nil {
441		logger.Fatalf("DoStreamingRoundTrip failed: %v", err)
442	}
443}
444
445func runBenchmark(caller rpcCallFunc, start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats, mode string) {
446	// Warm up connection.
447	for i := 0; i < warmupCallCount; i++ {
448		caller(0)
449	}
450
451	// Run benchmark.
452	start(mode, bf)
453	var wg sync.WaitGroup
454	wg.Add(bf.MaxConcurrentCalls)
455	bmEnd := time.Now().Add(bf.BenchTime)
456	var count uint64
457	for i := 0; i < bf.MaxConcurrentCalls; i++ {
458		go func(pos int) {
459			defer wg.Done()
460			for {
461				t := time.Now()
462				if t.After(bmEnd) {
463					return
464				}
465				start := time.Now()
466				caller(pos)
467				elapse := time.Since(start)
468				atomic.AddUint64(&count, 1)
469				s.AddDuration(elapse)
470			}
471		}(i)
472	}
473	wg.Wait()
474	stop(count)
475}
476
477// benchOpts represents all configurable options available while running this
478// benchmark. This is built from the values passed as flags.
479type benchOpts struct {
480	rModes              runModes
481	benchTime           time.Duration
482	memProfileRate      int
483	memProfile          string
484	cpuProfile          string
485	networkMode         string
486	benchmarkResultFile string
487	useBufconn          bool
488	enableKeepalive     bool
489	features            *featureOpts
490}
491
492// featureOpts represents options which can have multiple values. The user
493// usually provides a comma-separated list of options for each of these
494// features through command line flags. We generate all possible combinations
495// for the provided values and run the benchmarks for each combination.
496type featureOpts struct {
497	enableTrace        []bool
498	readLatencies      []time.Duration
499	readKbps           []int
500	readMTU            []int
501	maxConcurrentCalls []int
502	reqSizeBytes       []int
503	respSizeBytes      []int
504	reqPayloadCurves   []*stats.PayloadCurve
505	respPayloadCurves  []*stats.PayloadCurve
506	compModes          []string
507	enableChannelz     []bool
508	enablePreloader    []bool
509}
510
511// makeFeaturesNum returns a slice of ints of size 'maxFeatureIndex' where each
512// element of the slice (indexed by 'featuresIndex' enum) contains the number
513// of features to be exercised by the benchmark code.
514// For example: Index 0 of the returned slice contains the number of values for
515// enableTrace feature, while index 1 contains the number of value of
516// readLatencies feature and so on.
517func makeFeaturesNum(b *benchOpts) []int {
518	featuresNum := make([]int, stats.MaxFeatureIndex)
519	for i := 0; i < len(featuresNum); i++ {
520		switch stats.FeatureIndex(i) {
521		case stats.EnableTraceIndex:
522			featuresNum[i] = len(b.features.enableTrace)
523		case stats.ReadLatenciesIndex:
524			featuresNum[i] = len(b.features.readLatencies)
525		case stats.ReadKbpsIndex:
526			featuresNum[i] = len(b.features.readKbps)
527		case stats.ReadMTUIndex:
528			featuresNum[i] = len(b.features.readMTU)
529		case stats.MaxConcurrentCallsIndex:
530			featuresNum[i] = len(b.features.maxConcurrentCalls)
531		case stats.ReqSizeBytesIndex:
532			featuresNum[i] = len(b.features.reqSizeBytes)
533		case stats.RespSizeBytesIndex:
534			featuresNum[i] = len(b.features.respSizeBytes)
535		case stats.ReqPayloadCurveIndex:
536			featuresNum[i] = len(b.features.reqPayloadCurves)
537		case stats.RespPayloadCurveIndex:
538			featuresNum[i] = len(b.features.respPayloadCurves)
539		case stats.CompModesIndex:
540			featuresNum[i] = len(b.features.compModes)
541		case stats.EnableChannelzIndex:
542			featuresNum[i] = len(b.features.enableChannelz)
543		case stats.EnablePreloaderIndex:
544			featuresNum[i] = len(b.features.enablePreloader)
545		default:
546			log.Fatalf("Unknown feature index %v in generateFeatures. maxFeatureIndex is %v", i, stats.MaxFeatureIndex)
547		}
548	}
549	return featuresNum
550}
551
552// sharedFeatures returns a bool slice which acts as a bitmask. Each item in
553// the slice represents a feature, indexed by 'featureIndex' enum.  The bit is
554// set to 1 if the corresponding feature does not have multiple value, so is
555// shared amongst all benchmarks.
556func sharedFeatures(featuresNum []int) []bool {
557	result := make([]bool, len(featuresNum))
558	for i, num := range featuresNum {
559		if num <= 1 {
560			result[i] = true
561		}
562	}
563	return result
564}
565
566// generateFeatures generates all combinations of the provided feature options.
567// While all the feature options are stored in the benchOpts struct, the input
568// parameter 'featuresNum' is a slice indexed by 'featureIndex' enum containing
569// the number of values for each feature.
570// For example, let's say the user sets -workloads=all and
571// -maxConcurrentCalls=1,100, this would end up with the following
572// combinations:
573// [workloads: unary, maxConcurrentCalls=1]
574// [workloads: unary, maxConcurrentCalls=1]
575// [workloads: streaming, maxConcurrentCalls=100]
576// [workloads: streaming, maxConcurrentCalls=100]
577// [workloads: unconstrained, maxConcurrentCalls=1]
578// [workloads: unconstrained, maxConcurrentCalls=100]
579func (b *benchOpts) generateFeatures(featuresNum []int) []stats.Features {
580	// curPos and initialPos are two slices where each value acts as an index
581	// into the appropriate feature slice maintained in benchOpts.features. This
582	// loop generates all possible combinations of features by changing one value
583	// at a time, and once curPos becomes equal to initialPos, we have explored
584	// all options.
585	var result []stats.Features
586	var curPos []int
587	initialPos := make([]int, stats.MaxFeatureIndex)
588	for !reflect.DeepEqual(initialPos, curPos) {
589		if curPos == nil {
590			curPos = make([]int, stats.MaxFeatureIndex)
591		}
592		f := stats.Features{
593			// These features stay the same for each iteration.
594			NetworkMode:     b.networkMode,
595			UseBufConn:      b.useBufconn,
596			EnableKeepalive: b.enableKeepalive,
597			BenchTime:       b.benchTime,
598			// These features can potentially change for each iteration.
599			EnableTrace:        b.features.enableTrace[curPos[stats.EnableTraceIndex]],
600			Latency:            b.features.readLatencies[curPos[stats.ReadLatenciesIndex]],
601			Kbps:               b.features.readKbps[curPos[stats.ReadKbpsIndex]],
602			MTU:                b.features.readMTU[curPos[stats.ReadMTUIndex]],
603			MaxConcurrentCalls: b.features.maxConcurrentCalls[curPos[stats.MaxConcurrentCallsIndex]],
604			ModeCompressor:     b.features.compModes[curPos[stats.CompModesIndex]],
605			EnableChannelz:     b.features.enableChannelz[curPos[stats.EnableChannelzIndex]],
606			EnablePreloader:    b.features.enablePreloader[curPos[stats.EnablePreloaderIndex]],
607		}
608		if len(b.features.reqPayloadCurves) == 0 {
609			f.ReqSizeBytes = b.features.reqSizeBytes[curPos[stats.ReqSizeBytesIndex]]
610		} else {
611			f.ReqPayloadCurve = b.features.reqPayloadCurves[curPos[stats.ReqPayloadCurveIndex]]
612		}
613		if len(b.features.respPayloadCurves) == 0 {
614			f.RespSizeBytes = b.features.respSizeBytes[curPos[stats.RespSizeBytesIndex]]
615		} else {
616			f.RespPayloadCurve = b.features.respPayloadCurves[curPos[stats.RespPayloadCurveIndex]]
617		}
618		result = append(result, f)
619		addOne(curPos, featuresNum)
620	}
621	return result
622}
623
624// addOne mutates the input slice 'features' by changing one feature, thus
625// arriving at the next combination of feature values. 'featuresMaxPosition'
626// provides the numbers of allowed values for each feature, indexed by
627// 'featureIndex' enum.
628func addOne(features []int, featuresMaxPosition []int) {
629	for i := len(features) - 1; i >= 0; i-- {
630		if featuresMaxPosition[i] == 0 {
631			continue
632		}
633		features[i] = (features[i] + 1)
634		if features[i]/featuresMaxPosition[i] == 0 {
635			break
636		}
637		features[i] = features[i] % featuresMaxPosition[i]
638	}
639}
640
641// processFlags reads the command line flags and builds benchOpts. Specifying
642// invalid values for certain flags will cause flag.Parse() to fail, and the
643// program to terminate.
644// This *SHOULD* be the only place where the flags are accessed. All other
645// parts of the benchmark code should rely on the returned benchOpts.
646func processFlags() *benchOpts {
647	flag.Parse()
648	if flag.NArg() != 0 {
649		log.Fatal("Error: unparsed arguments: ", flag.Args())
650	}
651
652	opts := &benchOpts{
653		rModes:              runModesFromWorkloads(*workloads),
654		benchTime:           *benchTime,
655		memProfileRate:      *memProfileRate,
656		memProfile:          *memProfile,
657		cpuProfile:          *cpuProfile,
658		networkMode:         *networkMode,
659		benchmarkResultFile: *benchmarkResultFile,
660		useBufconn:          *useBufconn,
661		enableKeepalive:     *enableKeepalive,
662		features: &featureOpts{
663			enableTrace:        setToggleMode(*traceMode),
664			readLatencies:      append([]time.Duration(nil), *readLatency...),
665			readKbps:           append([]int(nil), *readKbps...),
666			readMTU:            append([]int(nil), *readMTU...),
667			maxConcurrentCalls: append([]int(nil), *maxConcurrentCalls...),
668			reqSizeBytes:       append([]int(nil), *readReqSizeBytes...),
669			respSizeBytes:      append([]int(nil), *readRespSizeBytes...),
670			compModes:          setCompressorMode(*compressorMode),
671			enableChannelz:     setToggleMode(*channelzOn),
672			enablePreloader:    setToggleMode(*preloaderMode),
673		},
674	}
675
676	if len(*reqPayloadCurveFiles) == 0 {
677		if len(opts.features.reqSizeBytes) == 0 {
678			opts.features.reqSizeBytes = defaultReqSizeBytes
679		}
680	} else {
681		if len(opts.features.reqSizeBytes) != 0 {
682			log.Fatalf("you may not specify -reqPayloadCurveFiles and -reqSizeBytes at the same time")
683		}
684		for _, file := range *reqPayloadCurveFiles {
685			pc, err := stats.NewPayloadCurve(file)
686			if err != nil {
687				log.Fatalf("cannot load payload curve file %s: %v", file, err)
688			}
689			opts.features.reqPayloadCurves = append(opts.features.reqPayloadCurves, pc)
690		}
691		opts.features.reqSizeBytes = nil
692	}
693	if len(*respPayloadCurveFiles) == 0 {
694		if len(opts.features.respSizeBytes) == 0 {
695			opts.features.respSizeBytes = defaultRespSizeBytes
696		}
697	} else {
698		if len(opts.features.respSizeBytes) != 0 {
699			log.Fatalf("you may not specify -respPayloadCurveFiles and -respSizeBytes at the same time")
700		}
701		for _, file := range *respPayloadCurveFiles {
702			pc, err := stats.NewPayloadCurve(file)
703			if err != nil {
704				log.Fatalf("cannot load payload curve file %s: %v", file, err)
705			}
706			opts.features.respPayloadCurves = append(opts.features.respPayloadCurves, pc)
707		}
708		opts.features.respSizeBytes = nil
709	}
710
711	// Re-write latency, kpbs and mtu if network mode is set.
712	if network, ok := networks[opts.networkMode]; ok {
713		opts.features.readLatencies = []time.Duration{network.Latency}
714		opts.features.readKbps = []int{network.Kbps}
715		opts.features.readMTU = []int{network.MTU}
716	}
717	return opts
718}
719
720func setToggleMode(val string) []bool {
721	switch val {
722	case toggleModeOn:
723		return []bool{true}
724	case toggleModeOff:
725		return []bool{false}
726	case toggleModeBoth:
727		return []bool{false, true}
728	default:
729		// This should never happen because a wrong value passed to this flag would
730		// be caught during flag.Parse().
731		return []bool{}
732	}
733}
734
735func setCompressorMode(val string) []string {
736	switch val {
737	case compModeNop, compModeGzip, compModeOff:
738		return []string{val}
739	case compModeAll:
740		return []string{compModeNop, compModeGzip, compModeOff}
741	default:
742		// This should never happen because a wrong value passed to this flag would
743		// be caught during flag.Parse().
744		return []string{}
745	}
746}
747
748func main() {
749	opts := processFlags()
750	before(opts)
751
752	s := stats.NewStats(numStatsBuckets)
753	featuresNum := makeFeaturesNum(opts)
754	sf := sharedFeatures(featuresNum)
755
756	var (
757		start  = func(mode string, bf stats.Features) { s.StartRun(mode, bf, sf) }
758		stop   = func(count uint64) { s.EndRun(count) }
759		ucStop = func(req uint64, resp uint64) { s.EndUnconstrainedRun(req, resp) }
760	)
761
762	for _, bf := range opts.generateFeatures(featuresNum) {
763		grpc.EnableTracing = bf.EnableTrace
764		if bf.EnableChannelz {
765			channelz.TurnOn()
766		}
767		if opts.rModes.unary {
768			unaryBenchmark(start, stop, bf, s)
769		}
770		if opts.rModes.streaming {
771			streamBenchmark(start, stop, bf, s)
772		}
773		if opts.rModes.unconstrained {
774			unconstrainedStreamBenchmark(start, ucStop, bf, s)
775		}
776	}
777	after(opts, s.GetResults())
778}
779
780func before(opts *benchOpts) {
781	if opts.memProfile != "" {
782		runtime.MemProfileRate = opts.memProfileRate
783	}
784	if opts.cpuProfile != "" {
785		f, err := os.Create(opts.cpuProfile)
786		if err != nil {
787			fmt.Fprintf(os.Stderr, "testing: %s\n", err)
788			return
789		}
790		if err := pprof.StartCPUProfile(f); err != nil {
791			fmt.Fprintf(os.Stderr, "testing: can't start cpu profile: %s\n", err)
792			f.Close()
793			return
794		}
795	}
796}
797
798func after(opts *benchOpts, data []stats.BenchResults) {
799	if opts.cpuProfile != "" {
800		pprof.StopCPUProfile() // flushes profile to disk
801	}
802	if opts.memProfile != "" {
803		f, err := os.Create(opts.memProfile)
804		if err != nil {
805			fmt.Fprintf(os.Stderr, "testing: %s\n", err)
806			os.Exit(2)
807		}
808		runtime.GC() // materialize all statistics
809		if err = pprof.WriteHeapProfile(f); err != nil {
810			fmt.Fprintf(os.Stderr, "testing: can't write heap profile %s: %s\n", opts.memProfile, err)
811			os.Exit(2)
812		}
813		f.Close()
814	}
815	if opts.benchmarkResultFile != "" {
816		f, err := os.Create(opts.benchmarkResultFile)
817		if err != nil {
818			log.Fatalf("testing: can't write benchmark result %s: %s\n", opts.benchmarkResultFile, err)
819		}
820		dataEncoder := gob.NewEncoder(f)
821		dataEncoder.Encode(data)
822		f.Close()
823	}
824}
825
826// nopCompressor is a compressor that just copies data.
827type nopCompressor struct{}
828
829func (nopCompressor) Do(w io.Writer, p []byte) error {
830	n, err := w.Write(p)
831	if err != nil {
832		return err
833	}
834	if n != len(p) {
835		return fmt.Errorf("nopCompressor.Write: wrote %v bytes; want %v", n, len(p))
836	}
837	return nil
838}
839
840func (nopCompressor) Type() string { return compModeNop }
841
842// nopDecompressor is a decompressor that just copies data.
843type nopDecompressor struct{}
844
845func (nopDecompressor) Do(r io.Reader) ([]byte, error) { return ioutil.ReadAll(r) }
846func (nopDecompressor) Type() string                   { return compModeNop }
847