1/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19/*
20Package main provides benchmark with setting flags.
21
22An example to run some benchmarks with profiling enabled:
23
24go run benchmark/benchmain/main.go -benchtime=10s -workloads=all \
25  -compression=gzip -maxConcurrentCalls=1 -trace=off \
26  -reqSizeBytes=1,1048576 -respSizeBytes=1,1048576 -networkMode=Local \
27  -cpuProfile=cpuProf -memProfile=memProf -memProfileRate=10000 -resultFile=result
28
29As a suggestion, when creating a branch, you can run this benchmark and save the result
30file "-resultFile=basePerf", and later when you at the middle of the work or finish the
31work, you can get the benchmark result and compare it with the base anytime.
32
33Assume there are two result files names as "basePerf" and "curPerf" created by adding
34-resultFile=basePerf and -resultFile=curPerf.
35	To format the curPerf, run:
36  	go run benchmark/benchresult/main.go curPerf
37	To observe how the performance changes based on a base result, run:
38  	go run benchmark/benchresult/main.go basePerf curPerf
39*/
40package main
41
42import (
43	"context"
44	"encoding/gob"
45	"flag"
46	"fmt"
47	"io"
48	"io/ioutil"
49	"log"
50	"net"
51	"os"
52	"reflect"
53	"runtime"
54	"runtime/pprof"
55	"strings"
56	"sync"
57	"sync/atomic"
58	"time"
59
60	"google.golang.org/grpc"
61	bm "google.golang.org/grpc/benchmark"
62	"google.golang.org/grpc/benchmark/flags"
63	testpb "google.golang.org/grpc/benchmark/grpc_testing"
64	"google.golang.org/grpc/benchmark/latency"
65	"google.golang.org/grpc/benchmark/stats"
66	"google.golang.org/grpc/grpclog"
67	"google.golang.org/grpc/internal/channelz"
68	"google.golang.org/grpc/keepalive"
69	"google.golang.org/grpc/test/bufconn"
70)
71
72var (
73	workloads = flags.StringWithAllowedValues("workloads", workloadsAll,
74		fmt.Sprintf("Workloads to execute - One of: %v", strings.Join(allWorkloads, ", ")), allWorkloads)
75	traceMode = flags.StringWithAllowedValues("trace", toggleModeOff,
76		fmt.Sprintf("Trace mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes)
77	preloaderMode = flags.StringWithAllowedValues("preloader", toggleModeOff,
78		fmt.Sprintf("Preloader mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes)
79	channelzOn = flags.StringWithAllowedValues("channelz", toggleModeOff,
80		fmt.Sprintf("Channelz mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes)
81	compressorMode = flags.StringWithAllowedValues("compression", compModeOff,
82		fmt.Sprintf("Compression mode - One of: %v", strings.Join(allCompModes, ", ")), allCompModes)
83	networkMode = flags.StringWithAllowedValues("networkMode", networkModeNone,
84		"Network mode includes LAN, WAN, Local and Longhaul", allNetworkModes)
85	readLatency        = flags.DurationSlice("latency", defaultReadLatency, "Simulated one-way network latency - may be a comma-separated list")
86	readKbps           = flags.IntSlice("kbps", defaultReadKbps, "Simulated network throughput (in kbps) - may be a comma-separated list")
87	readMTU            = flags.IntSlice("mtu", defaultReadMTU, "Simulated network MTU (Maximum Transmission Unit) - may be a comma-separated list")
88	maxConcurrentCalls = flags.IntSlice("maxConcurrentCalls", defaultMaxConcurrentCalls, "Number of concurrent RPCs during benchmarks")
89	readReqSizeBytes   = flags.IntSlice("reqSizeBytes", defaultReqSizeBytes, "Request size in bytes - may be a comma-separated list")
90	readRespSizeBytes  = flags.IntSlice("respSizeBytes", defaultRespSizeBytes, "Response size in bytes - may be a comma-separated list")
91	benchTime          = flag.Duration("benchtime", time.Second, "Configures the amount of time to run each benchmark")
92	memProfile         = flag.String("memProfile", "", "Enables memory profiling output to the filename provided.")
93	memProfileRate     = flag.Int("memProfileRate", 512*1024, "Configures the memory profiling rate. \n"+
94		"memProfile should be set before setting profile rate. To include every allocated block in the profile, "+
95		"set MemProfileRate to 1. To turn off profiling entirely, set MemProfileRate to 0. 512 * 1024 by default.")
96	cpuProfile          = flag.String("cpuProfile", "", "Enables CPU profiling output to the filename provided")
97	benchmarkResultFile = flag.String("resultFile", "", "Save the benchmark result into a binary file")
98	useBufconn          = flag.Bool("bufconn", false, "Use in-memory connection instead of system network I/O")
99	enableKeepalive     = flag.Bool("enable_keepalive", false, "Enable client keepalive. \n"+
100		"Keepalive.Time is set to 10s, Keepalive.Timeout is set to 1s, Keepalive.PermitWithoutStream is set to true.")
101)
102
103const (
104	workloadsUnary         = "unary"
105	workloadsStreaming     = "streaming"
106	workloadsUnconstrained = "unconstrained"
107	workloadsAll           = "all"
108	// Compression modes.
109	compModeOff  = "off"
110	compModeGzip = "gzip"
111	compModeNop  = "nop"
112	compModeAll  = "all"
113	// Toggle modes.
114	toggleModeOff  = "off"
115	toggleModeOn   = "on"
116	toggleModeBoth = "both"
117	// Network modes.
118	networkModeNone  = "none"
119	networkModeLocal = "Local"
120	networkModeLAN   = "LAN"
121	networkModeWAN   = "WAN"
122	networkLongHaul  = "Longhaul"
123
124	numStatsBuckets = 10
125	warmupCallCount = 10
126	warmuptime      = time.Second
127)
128
129var (
130	allWorkloads              = []string{workloadsUnary, workloadsStreaming, workloadsUnconstrained, workloadsAll}
131	allCompModes              = []string{compModeOff, compModeGzip, compModeNop, compModeAll}
132	allToggleModes            = []string{toggleModeOff, toggleModeOn, toggleModeBoth}
133	allNetworkModes           = []string{networkModeNone, networkModeLocal, networkModeLAN, networkModeWAN, networkLongHaul}
134	defaultReadLatency        = []time.Duration{0, 40 * time.Millisecond} // if non-positive, no delay.
135	defaultReadKbps           = []int{0, 10240}                           // if non-positive, infinite
136	defaultReadMTU            = []int{0}                                  // if non-positive, infinite
137	defaultMaxConcurrentCalls = []int{1, 8, 64, 512}
138	defaultReqSizeBytes       = []int{1, 1024, 1024 * 1024}
139	defaultRespSizeBytes      = []int{1, 1024, 1024 * 1024}
140	networks                  = map[string]latency.Network{
141		networkModeLocal: latency.Local,
142		networkModeLAN:   latency.LAN,
143		networkModeWAN:   latency.WAN,
144		networkLongHaul:  latency.Longhaul,
145	}
146	keepaliveTime    = 10 * time.Second
147	keepaliveTimeout = 1 * time.Second
148	// This is 0.8*keepaliveTime to prevent connection issues because of server
149	// keepalive enforcement.
150	keepaliveMinTime = 8 * time.Second
151)
152
153// runModes indicates the workloads to run. This is initialized with a call to
154// `runModesFromWorkloads`, passing the workloads flag set by the user.
155type runModes struct {
156	unary, streaming, unconstrained bool
157}
158
159// runModesFromWorkloads determines the runModes based on the value of
160// workloads flag set by the user.
161func runModesFromWorkloads(workload string) runModes {
162	r := runModes{}
163	switch workload {
164	case workloadsUnary:
165		r.unary = true
166	case workloadsStreaming:
167		r.streaming = true
168	case workloadsUnconstrained:
169		r.unconstrained = true
170	case workloadsAll:
171		r.unary = true
172		r.streaming = true
173		r.unconstrained = true
174	default:
175		log.Fatalf("Unknown workloads setting: %v (want one of: %v)",
176			workloads, strings.Join(allWorkloads, ", "))
177	}
178	return r
179}
180
181type startFunc func(mode string, bf stats.Features)
182type stopFunc func(count uint64)
183type ucStopFunc func(req uint64, resp uint64)
184type rpcCallFunc func(pos int)
185type rpcSendFunc func(pos int)
186type rpcRecvFunc func(pos int)
187type rpcCleanupFunc func()
188
189func unaryBenchmark(start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats) {
190	caller, cleanup := makeFuncUnary(bf)
191	defer cleanup()
192	runBenchmark(caller, start, stop, bf, s, workloadsUnary)
193}
194
195func streamBenchmark(start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats) {
196	caller, cleanup := makeFuncStream(bf)
197	defer cleanup()
198	runBenchmark(caller, start, stop, bf, s, workloadsStreaming)
199}
200
201func unconstrainedStreamBenchmark(start startFunc, stop ucStopFunc, bf stats.Features, s *stats.Stats) {
202	var sender rpcSendFunc
203	var recver rpcRecvFunc
204	var cleanup rpcCleanupFunc
205	if bf.EnablePreloader {
206		sender, recver, cleanup = makeFuncUnconstrainedStreamPreloaded(bf)
207	} else {
208		sender, recver, cleanup = makeFuncUnconstrainedStream(bf)
209	}
210	defer cleanup()
211
212	var req, resp uint64
213	go func() {
214		// Resets the counters once warmed up
215		<-time.NewTimer(warmuptime).C
216		atomic.StoreUint64(&req, 0)
217		atomic.StoreUint64(&resp, 0)
218		start(workloadsUnconstrained, bf)
219	}()
220
221	bmEnd := time.Now().Add(bf.BenchTime + warmuptime)
222	var wg sync.WaitGroup
223	wg.Add(2 * bf.MaxConcurrentCalls)
224	for i := 0; i < bf.MaxConcurrentCalls; i++ {
225		go func(pos int) {
226			defer wg.Done()
227			for {
228				t := time.Now()
229				if t.After(bmEnd) {
230					return
231				}
232				sender(pos)
233				atomic.AddUint64(&req, 1)
234			}
235		}(i)
236		go func(pos int) {
237			defer wg.Done()
238			for {
239				t := time.Now()
240				if t.After(bmEnd) {
241					return
242				}
243				recver(pos)
244				atomic.AddUint64(&resp, 1)
245			}
246		}(i)
247	}
248	wg.Wait()
249	stop(req, resp)
250}
251
252// makeClient returns a gRPC client for the grpc.testing.BenchmarkService
253// service. The client is configured using the different options in the passed
254// 'bf'. Also returns a cleanup function to close the client and release
255// resources.
256func makeClient(bf stats.Features) (testpb.BenchmarkServiceClient, func()) {
257	nw := &latency.Network{Kbps: bf.Kbps, Latency: bf.Latency, MTU: bf.MTU}
258	opts := []grpc.DialOption{}
259	sopts := []grpc.ServerOption{}
260	if bf.ModeCompressor == compModeNop {
261		sopts = append(sopts,
262			grpc.RPCCompressor(nopCompressor{}),
263			grpc.RPCDecompressor(nopDecompressor{}),
264		)
265		opts = append(opts,
266			grpc.WithCompressor(nopCompressor{}),
267			grpc.WithDecompressor(nopDecompressor{}),
268		)
269	}
270	if bf.ModeCompressor == compModeGzip {
271		sopts = append(sopts,
272			grpc.RPCCompressor(grpc.NewGZIPCompressor()),
273			grpc.RPCDecompressor(grpc.NewGZIPDecompressor()),
274		)
275		opts = append(opts,
276			grpc.WithCompressor(grpc.NewGZIPCompressor()),
277			grpc.WithDecompressor(grpc.NewGZIPDecompressor()),
278		)
279	}
280	if bf.EnableKeepalive {
281		sopts = append(sopts,
282			grpc.KeepaliveParams(keepalive.ServerParameters{
283				Time:    keepaliveTime,
284				Timeout: keepaliveTimeout,
285			}),
286			grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
287				MinTime:             keepaliveMinTime,
288				PermitWithoutStream: true,
289			}),
290		)
291		opts = append(opts,
292			grpc.WithKeepaliveParams(keepalive.ClientParameters{
293				Time:                keepaliveTime,
294				Timeout:             keepaliveTimeout,
295				PermitWithoutStream: true,
296			}),
297		)
298	}
299	sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(bf.MaxConcurrentCalls+1)))
300	opts = append(opts, grpc.WithInsecure())
301
302	var lis net.Listener
303	if bf.UseBufConn {
304		bcLis := bufconn.Listen(256 * 1024)
305		lis = bcLis
306		opts = append(opts, grpc.WithContextDialer(func(ctx context.Context, address string) (net.Conn, error) {
307			return nw.ContextDialer(func(context.Context, string, string) (net.Conn, error) {
308				return bcLis.Dial()
309			})(ctx, "", "")
310		}))
311	} else {
312		var err error
313		lis, err = net.Listen("tcp", "localhost:0")
314		if err != nil {
315			grpclog.Fatalf("Failed to listen: %v", err)
316		}
317		opts = append(opts, grpc.WithContextDialer(func(ctx context.Context, address string) (net.Conn, error) {
318			return nw.ContextDialer((&net.Dialer{}).DialContext)(ctx, "tcp", lis.Addr().String())
319		}))
320	}
321	lis = nw.Listener(lis)
322	stopper := bm.StartServer(bm.ServerInfo{Type: "protobuf", Listener: lis}, sopts...)
323	conn := bm.NewClientConn("" /* target not used */, opts...)
324	return testpb.NewBenchmarkServiceClient(conn), func() {
325		conn.Close()
326		stopper()
327	}
328}
329
330func makeFuncUnary(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
331	tc, cleanup := makeClient(bf)
332	return func(int) {
333		unaryCaller(tc, bf.ReqSizeBytes, bf.RespSizeBytes)
334	}, cleanup
335}
336
337func makeFuncStream(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
338	tc, cleanup := makeClient(bf)
339
340	streams := make([]testpb.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls)
341	for i := 0; i < bf.MaxConcurrentCalls; i++ {
342		stream, err := tc.StreamingCall(context.Background())
343		if err != nil {
344			grpclog.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
345		}
346		streams[i] = stream
347	}
348
349	return func(pos int) {
350		streamCaller(streams[pos], bf.ReqSizeBytes, bf.RespSizeBytes)
351	}, cleanup
352}
353
354func makeFuncUnconstrainedStreamPreloaded(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) {
355	streams, req, cleanup := setupUnconstrainedStream(bf)
356
357	preparedMsg := make([]*grpc.PreparedMsg, len(streams))
358	for i, stream := range streams {
359		preparedMsg[i] = &grpc.PreparedMsg{}
360		err := preparedMsg[i].Encode(stream, req)
361		if err != nil {
362			grpclog.Fatalf("%v.Encode(%v, %v) = %v", preparedMsg[i], req, stream, err)
363		}
364	}
365
366	return func(pos int) {
367			streams[pos].SendMsg(preparedMsg[pos])
368		}, func(pos int) {
369			streams[pos].Recv()
370		}, cleanup
371}
372
373func makeFuncUnconstrainedStream(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) {
374	streams, req, cleanup := setupUnconstrainedStream(bf)
375
376	return func(pos int) {
377			streams[pos].Send(req)
378		}, func(pos int) {
379			streams[pos].Recv()
380		}, cleanup
381}
382
383func setupUnconstrainedStream(bf stats.Features) ([]testpb.BenchmarkService_StreamingCallClient, *testpb.SimpleRequest, rpcCleanupFunc) {
384	tc, cleanup := makeClient(bf)
385
386	streams := make([]testpb.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls)
387	for i := 0; i < bf.MaxConcurrentCalls; i++ {
388		stream, err := tc.UnconstrainedStreamingCall(context.Background())
389		if err != nil {
390			grpclog.Fatalf("%v.UnconstrainedStreamingCall(_) = _, %v", tc, err)
391		}
392		streams[i] = stream
393	}
394
395	pl := bm.NewPayload(testpb.PayloadType_COMPRESSABLE, bf.ReqSizeBytes)
396	req := &testpb.SimpleRequest{
397		ResponseType: pl.Type,
398		ResponseSize: int32(bf.RespSizeBytes),
399		Payload:      pl,
400	}
401
402	return streams, req, cleanup
403}
404
405// Makes a UnaryCall gRPC request using the given BenchmarkServiceClient and
406// request and response sizes.
407func unaryCaller(client testpb.BenchmarkServiceClient, reqSize, respSize int) {
408	if err := bm.DoUnaryCall(client, reqSize, respSize); err != nil {
409		grpclog.Fatalf("DoUnaryCall failed: %v", err)
410	}
411}
412
413func streamCaller(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) {
414	if err := bm.DoStreamingRoundTrip(stream, reqSize, respSize); err != nil {
415		grpclog.Fatalf("DoStreamingRoundTrip failed: %v", err)
416	}
417}
418
419func runBenchmark(caller rpcCallFunc, start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats, mode string) {
420	// Warm up connection.
421	for i := 0; i < warmupCallCount; i++ {
422		caller(0)
423	}
424
425	// Run benchmark.
426	start(mode, bf)
427	var wg sync.WaitGroup
428	wg.Add(bf.MaxConcurrentCalls)
429	bmEnd := time.Now().Add(bf.BenchTime)
430	var count uint64
431	for i := 0; i < bf.MaxConcurrentCalls; i++ {
432		go func(pos int) {
433			defer wg.Done()
434			for {
435				t := time.Now()
436				if t.After(bmEnd) {
437					return
438				}
439				start := time.Now()
440				caller(pos)
441				elapse := time.Since(start)
442				atomic.AddUint64(&count, 1)
443				s.AddDuration(elapse)
444			}
445		}(i)
446	}
447	wg.Wait()
448	stop(count)
449}
450
451// benchOpts represents all configurable options available while running this
452// benchmark. This is built from the values passed as flags.
453type benchOpts struct {
454	rModes              runModes
455	benchTime           time.Duration
456	memProfileRate      int
457	memProfile          string
458	cpuProfile          string
459	networkMode         string
460	benchmarkResultFile string
461	useBufconn          bool
462	enableKeepalive     bool
463	features            *featureOpts
464}
465
466// featureOpts represents options which can have multiple values. The user
467// usually provides a comma-separated list of options for each of these
468// features through command line flags. We generate all possible combinations
469// for the provided values and run the benchmarks for each combination.
470type featureOpts struct {
471	enableTrace        []bool
472	readLatencies      []time.Duration
473	readKbps           []int
474	readMTU            []int
475	maxConcurrentCalls []int
476	reqSizeBytes       []int
477	respSizeBytes      []int
478	compModes          []string
479	enableChannelz     []bool
480	enablePreloader    []bool
481}
482
483// makeFeaturesNum returns a slice of ints of size 'maxFeatureIndex' where each
484// element of the slice (indexed by 'featuresIndex' enum) contains the number
485// of features to be exercised by the benchmark code.
486// For example: Index 0 of the returned slice contains the number of values for
487// enableTrace feature, while index 1 contains the number of value of
488// readLatencies feature and so on.
489func makeFeaturesNum(b *benchOpts) []int {
490	featuresNum := make([]int, stats.MaxFeatureIndex)
491	for i := 0; i < len(featuresNum); i++ {
492		switch stats.FeatureIndex(i) {
493		case stats.EnableTraceIndex:
494			featuresNum[i] = len(b.features.enableTrace)
495		case stats.ReadLatenciesIndex:
496			featuresNum[i] = len(b.features.readLatencies)
497		case stats.ReadKbpsIndex:
498			featuresNum[i] = len(b.features.readKbps)
499		case stats.ReadMTUIndex:
500			featuresNum[i] = len(b.features.readMTU)
501		case stats.MaxConcurrentCallsIndex:
502			featuresNum[i] = len(b.features.maxConcurrentCalls)
503		case stats.ReqSizeBytesIndex:
504			featuresNum[i] = len(b.features.reqSizeBytes)
505		case stats.RespSizeBytesIndex:
506			featuresNum[i] = len(b.features.respSizeBytes)
507		case stats.CompModesIndex:
508			featuresNum[i] = len(b.features.compModes)
509		case stats.EnableChannelzIndex:
510			featuresNum[i] = len(b.features.enableChannelz)
511		case stats.EnablePreloaderIndex:
512			featuresNum[i] = len(b.features.enablePreloader)
513		default:
514			log.Fatalf("Unknown feature index %v in generateFeatures. maxFeatureIndex is %v", i, stats.MaxFeatureIndex)
515		}
516	}
517	return featuresNum
518}
519
520// sharedFeatures returns a bool slice which acts as a bitmask. Each item in
521// the slice represents a feature, indexed by 'featureIndex' enum.  The bit is
522// set to 1 if the corresponding feature does not have multiple value, so is
523// shared amongst all benchmarks.
524func sharedFeatures(featuresNum []int) []bool {
525	result := make([]bool, len(featuresNum))
526	for i, num := range featuresNum {
527		if num <= 1 {
528			result[i] = true
529		}
530	}
531	return result
532}
533
534// generateFeatures generates all combinations of the provided feature options.
535// While all the feature options are stored in the benchOpts struct, the input
536// parameter 'featuresNum' is a slice indexed by 'featureIndex' enum containing
537// the number of values for each feature.
538// For example, let's say the user sets -workloads=all and
539// -maxConcurrentCalls=1,100, this would end up with the following
540// combinations:
541// [workloads: unary, maxConcurrentCalls=1]
542// [workloads: unary, maxConcurrentCalls=1]
543// [workloads: streaming, maxConcurrentCalls=100]
544// [workloads: streaming, maxConcurrentCalls=100]
545// [workloads: unconstrained, maxConcurrentCalls=1]
546// [workloads: unconstrained, maxConcurrentCalls=100]
547func (b *benchOpts) generateFeatures(featuresNum []int) []stats.Features {
548	// curPos and initialPos are two slices where each value acts as an index
549	// into the appropriate feature slice maintained in benchOpts.features. This
550	// loop generates all possible combinations of features by changing one value
551	// at a time, and once curPos becomes equal to initialPos, we have explored
552	// all options.
553	var result []stats.Features
554	var curPos []int
555	initialPos := make([]int, stats.MaxFeatureIndex)
556	for !reflect.DeepEqual(initialPos, curPos) {
557		if curPos == nil {
558			curPos = make([]int, stats.MaxFeatureIndex)
559		}
560		result = append(result, stats.Features{
561			// These features stay the same for each iteration.
562			NetworkMode:     b.networkMode,
563			UseBufConn:      b.useBufconn,
564			EnableKeepalive: b.enableKeepalive,
565			BenchTime:       b.benchTime,
566			// These features can potentially change for each iteration.
567			EnableTrace:        b.features.enableTrace[curPos[stats.EnableTraceIndex]],
568			Latency:            b.features.readLatencies[curPos[stats.ReadLatenciesIndex]],
569			Kbps:               b.features.readKbps[curPos[stats.ReadKbpsIndex]],
570			MTU:                b.features.readMTU[curPos[stats.ReadMTUIndex]],
571			MaxConcurrentCalls: b.features.maxConcurrentCalls[curPos[stats.MaxConcurrentCallsIndex]],
572			ReqSizeBytes:       b.features.reqSizeBytes[curPos[stats.ReqSizeBytesIndex]],
573			RespSizeBytes:      b.features.respSizeBytes[curPos[stats.RespSizeBytesIndex]],
574			ModeCompressor:     b.features.compModes[curPos[stats.CompModesIndex]],
575			EnableChannelz:     b.features.enableChannelz[curPos[stats.EnableChannelzIndex]],
576			EnablePreloader:    b.features.enablePreloader[curPos[stats.EnablePreloaderIndex]],
577		})
578		addOne(curPos, featuresNum)
579	}
580	return result
581}
582
583// addOne mutates the input slice 'features' by changing one feature, thus
584// arriving at the next combination of feature values. 'featuresMaxPosition'
585// provides the numbers of allowed values for each feature, indexed by
586// 'featureIndex' enum.
587func addOne(features []int, featuresMaxPosition []int) {
588	for i := len(features) - 1; i >= 0; i-- {
589		features[i] = (features[i] + 1)
590		if features[i]/featuresMaxPosition[i] == 0 {
591			break
592		}
593		features[i] = features[i] % featuresMaxPosition[i]
594	}
595}
596
597// processFlags reads the command line flags and builds benchOpts. Specifying
598// invalid values for certain flags will cause flag.Parse() to fail, and the
599// program to terminate.
600// This *SHOULD* be the only place where the flags are accessed. All other
601// parts of the benchmark code should rely on the returned benchOpts.
602func processFlags() *benchOpts {
603	flag.Parse()
604	if flag.NArg() != 0 {
605		log.Fatal("Error: unparsed arguments: ", flag.Args())
606	}
607
608	opts := &benchOpts{
609		rModes:              runModesFromWorkloads(*workloads),
610		benchTime:           *benchTime,
611		memProfileRate:      *memProfileRate,
612		memProfile:          *memProfile,
613		cpuProfile:          *cpuProfile,
614		networkMode:         *networkMode,
615		benchmarkResultFile: *benchmarkResultFile,
616		useBufconn:          *useBufconn,
617		enableKeepalive:     *enableKeepalive,
618		features: &featureOpts{
619			enableTrace:        setToggleMode(*traceMode),
620			readLatencies:      append([]time.Duration(nil), *readLatency...),
621			readKbps:           append([]int(nil), *readKbps...),
622			readMTU:            append([]int(nil), *readMTU...),
623			maxConcurrentCalls: append([]int(nil), *maxConcurrentCalls...),
624			reqSizeBytes:       append([]int(nil), *readReqSizeBytes...),
625			respSizeBytes:      append([]int(nil), *readRespSizeBytes...),
626			compModes:          setCompressorMode(*compressorMode),
627			enableChannelz:     setToggleMode(*channelzOn),
628			enablePreloader:    setToggleMode(*preloaderMode),
629		},
630	}
631
632	// Re-write latency, kpbs and mtu if network mode is set.
633	if network, ok := networks[opts.networkMode]; ok {
634		opts.features.readLatencies = []time.Duration{network.Latency}
635		opts.features.readKbps = []int{network.Kbps}
636		opts.features.readMTU = []int{network.MTU}
637	}
638	return opts
639}
640
641func setToggleMode(val string) []bool {
642	switch val {
643	case toggleModeOn:
644		return []bool{true}
645	case toggleModeOff:
646		return []bool{false}
647	case toggleModeBoth:
648		return []bool{false, true}
649	default:
650		// This should never happen because a wrong value passed to this flag would
651		// be caught during flag.Parse().
652		return []bool{}
653	}
654}
655
656func setCompressorMode(val string) []string {
657	switch val {
658	case compModeNop, compModeGzip, compModeOff:
659		return []string{val}
660	case compModeAll:
661		return []string{compModeNop, compModeGzip, compModeOff}
662	default:
663		// This should never happen because a wrong value passed to this flag would
664		// be caught during flag.Parse().
665		return []string{}
666	}
667}
668
669func main() {
670	opts := processFlags()
671	before(opts)
672
673	s := stats.NewStats(numStatsBuckets)
674	featuresNum := makeFeaturesNum(opts)
675	sf := sharedFeatures(featuresNum)
676
677	var (
678		start  = func(mode string, bf stats.Features) { s.StartRun(mode, bf, sf) }
679		stop   = func(count uint64) { s.EndRun(count) }
680		ucStop = func(req uint64, resp uint64) { s.EndUnconstrainedRun(req, resp) }
681	)
682
683	for _, bf := range opts.generateFeatures(featuresNum) {
684		grpc.EnableTracing = bf.EnableTrace
685		if bf.EnableChannelz {
686			channelz.TurnOn()
687		}
688		if opts.rModes.unary {
689			unaryBenchmark(start, stop, bf, s)
690		}
691		if opts.rModes.streaming {
692			streamBenchmark(start, stop, bf, s)
693		}
694		if opts.rModes.unconstrained {
695			unconstrainedStreamBenchmark(start, ucStop, bf, s)
696		}
697	}
698	after(opts, s.GetResults())
699}
700
701func before(opts *benchOpts) {
702	if opts.memProfile != "" {
703		runtime.MemProfileRate = opts.memProfileRate
704	}
705	if opts.cpuProfile != "" {
706		f, err := os.Create(opts.cpuProfile)
707		if err != nil {
708			fmt.Fprintf(os.Stderr, "testing: %s\n", err)
709			return
710		}
711		if err := pprof.StartCPUProfile(f); err != nil {
712			fmt.Fprintf(os.Stderr, "testing: can't start cpu profile: %s\n", err)
713			f.Close()
714			return
715		}
716	}
717}
718
719func after(opts *benchOpts, data []stats.BenchResults) {
720	if opts.cpuProfile != "" {
721		pprof.StopCPUProfile() // flushes profile to disk
722	}
723	if opts.memProfile != "" {
724		f, err := os.Create(opts.memProfile)
725		if err != nil {
726			fmt.Fprintf(os.Stderr, "testing: %s\n", err)
727			os.Exit(2)
728		}
729		runtime.GC() // materialize all statistics
730		if err = pprof.WriteHeapProfile(f); err != nil {
731			fmt.Fprintf(os.Stderr, "testing: can't write heap profile %s: %s\n", opts.memProfile, err)
732			os.Exit(2)
733		}
734		f.Close()
735	}
736	if opts.benchmarkResultFile != "" {
737		f, err := os.Create(opts.benchmarkResultFile)
738		if err != nil {
739			log.Fatalf("testing: can't write benchmark result %s: %s\n", opts.benchmarkResultFile, err)
740		}
741		dataEncoder := gob.NewEncoder(f)
742		dataEncoder.Encode(data)
743		f.Close()
744	}
745}
746
747// nopCompressor is a compressor that just copies data.
748type nopCompressor struct{}
749
750func (nopCompressor) Do(w io.Writer, p []byte) error {
751	n, err := w.Write(p)
752	if err != nil {
753		return err
754	}
755	if n != len(p) {
756		return fmt.Errorf("nopCompressor.Write: wrote %v bytes; want %v", n, len(p))
757	}
758	return nil
759}
760
761func (nopCompressor) Type() string { return compModeNop }
762
763// nopDecompressor is a decompressor that just copies data.
764type nopDecompressor struct{}
765
766func (nopDecompressor) Do(r io.Reader) ([]byte, error) { return ioutil.ReadAll(r) }
767func (nopDecompressor) Type() string                   { return compModeNop }
768