1/*
2 *
3 * Copyright 2017 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19/*
20Package main provides benchmark with setting flags.
21
22An example to run some benchmarks with profiling enabled:
23
24go run benchmark/benchmain/main.go -benchtime=10s -workloads=all \
25  -compression=gzip -maxConcurrentCalls=1 -trace=off \
26  -reqSizeBytes=1,1048576 -respSizeBytes=1,1048576 -networkMode=Local \
27  -cpuProfile=cpuProf -memProfile=memProf -memProfileRate=10000 -resultFile=result
28
29As a suggestion, when creating a branch, you can run this benchmark and save the result
30file "-resultFile=basePerf", and later when you at the middle of the work or finish the
31work, you can get the benchmark result and compare it with the base anytime.
32
33Assume there are two result files names as "basePerf" and "curPerf" created by adding
34-resultFile=basePerf and -resultFile=curPerf.
35	To format the curPerf, run:
36  	go run benchmark/benchresult/main.go curPerf
37	To observe how the performance changes based on a base result, run:
38  	go run benchmark/benchresult/main.go basePerf curPerf
39*/
40package main
41
42import (
43	"context"
44	"encoding/gob"
45	"flag"
46	"fmt"
47	"io"
48	"io/ioutil"
49	"log"
50	"net"
51	"os"
52	"reflect"
53	"runtime"
54	"runtime/pprof"
55	"strings"
56	"sync"
57	"sync/atomic"
58	"time"
59
60	"google.golang.org/grpc"
61	bm "google.golang.org/grpc/benchmark"
62	"google.golang.org/grpc/benchmark/flags"
63	testpb "google.golang.org/grpc/benchmark/grpc_testing"
64	"google.golang.org/grpc/benchmark/latency"
65	"google.golang.org/grpc/benchmark/stats"
66	"google.golang.org/grpc/grpclog"
67	"google.golang.org/grpc/internal/channelz"
68	"google.golang.org/grpc/keepalive"
69	"google.golang.org/grpc/test/bufconn"
70)
71
72var (
73	workloads = flags.StringWithAllowedValues("workloads", workloadsAll,
74		fmt.Sprintf("Workloads to execute - One of: %v", strings.Join(allWorkloads, ", ")), allWorkloads)
75	traceMode = flags.StringWithAllowedValues("trace", toggleModeOff,
76		fmt.Sprintf("Trace mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes)
77	preloaderMode = flags.StringWithAllowedValues("preloader", toggleModeOff,
78		fmt.Sprintf("Preloader mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes)
79	channelzOn = flags.StringWithAllowedValues("channelz", toggleModeOff,
80		fmt.Sprintf("Channelz mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes)
81	compressorMode = flags.StringWithAllowedValues("compression", compModeOff,
82		fmt.Sprintf("Compression mode - One of: %v", strings.Join(allCompModes, ", ")), allCompModes)
83	networkMode = flags.StringWithAllowedValues("networkMode", networkModeNone,
84		"Network mode includes LAN, WAN, Local and Longhaul", allNetworkModes)
85	readLatency           = flags.DurationSlice("latency", defaultReadLatency, "Simulated one-way network latency - may be a comma-separated list")
86	readKbps              = flags.IntSlice("kbps", defaultReadKbps, "Simulated network throughput (in kbps) - may be a comma-separated list")
87	readMTU               = flags.IntSlice("mtu", defaultReadMTU, "Simulated network MTU (Maximum Transmission Unit) - may be a comma-separated list")
88	maxConcurrentCalls    = flags.IntSlice("maxConcurrentCalls", defaultMaxConcurrentCalls, "Number of concurrent RPCs during benchmarks")
89	readReqSizeBytes      = flags.IntSlice("reqSizeBytes", nil, "Request size in bytes - may be a comma-separated list")
90	readRespSizeBytes     = flags.IntSlice("respSizeBytes", nil, "Response size in bytes - may be a comma-separated list")
91	reqPayloadCurveFiles  = flags.StringSlice("reqPayloadCurveFiles", nil, "comma-separated list of CSV files describing the shape a random distribution of request payload sizes")
92	respPayloadCurveFiles = flags.StringSlice("respPayloadCurveFiles", nil, "comma-separated list of CSV files describing the shape a random distribution of response payload sizes")
93	benchTime             = flag.Duration("benchtime", time.Second, "Configures the amount of time to run each benchmark")
94	memProfile            = flag.String("memProfile", "", "Enables memory profiling output to the filename provided.")
95	memProfileRate        = flag.Int("memProfileRate", 512*1024, "Configures the memory profiling rate. \n"+
96		"memProfile should be set before setting profile rate. To include every allocated block in the profile, "+
97		"set MemProfileRate to 1. To turn off profiling entirely, set MemProfileRate to 0. 512 * 1024 by default.")
98	cpuProfile          = flag.String("cpuProfile", "", "Enables CPU profiling output to the filename provided")
99	benchmarkResultFile = flag.String("resultFile", "", "Save the benchmark result into a binary file")
100	useBufconn          = flag.Bool("bufconn", false, "Use in-memory connection instead of system network I/O")
101	enableKeepalive     = flag.Bool("enable_keepalive", false, "Enable client keepalive. \n"+
102		"Keepalive.Time is set to 10s, Keepalive.Timeout is set to 1s, Keepalive.PermitWithoutStream is set to true.")
103)
104
105const (
106	workloadsUnary         = "unary"
107	workloadsStreaming     = "streaming"
108	workloadsUnconstrained = "unconstrained"
109	workloadsAll           = "all"
110	// Compression modes.
111	compModeOff  = "off"
112	compModeGzip = "gzip"
113	compModeNop  = "nop"
114	compModeAll  = "all"
115	// Toggle modes.
116	toggleModeOff  = "off"
117	toggleModeOn   = "on"
118	toggleModeBoth = "both"
119	// Network modes.
120	networkModeNone  = "none"
121	networkModeLocal = "Local"
122	networkModeLAN   = "LAN"
123	networkModeWAN   = "WAN"
124	networkLongHaul  = "Longhaul"
125
126	numStatsBuckets = 10
127	warmupCallCount = 10
128	warmuptime      = time.Second
129)
130
131var (
132	allWorkloads              = []string{workloadsUnary, workloadsStreaming, workloadsUnconstrained, workloadsAll}
133	allCompModes              = []string{compModeOff, compModeGzip, compModeNop, compModeAll}
134	allToggleModes            = []string{toggleModeOff, toggleModeOn, toggleModeBoth}
135	allNetworkModes           = []string{networkModeNone, networkModeLocal, networkModeLAN, networkModeWAN, networkLongHaul}
136	defaultReadLatency        = []time.Duration{0, 40 * time.Millisecond} // if non-positive, no delay.
137	defaultReadKbps           = []int{0, 10240}                           // if non-positive, infinite
138	defaultReadMTU            = []int{0}                                  // if non-positive, infinite
139	defaultMaxConcurrentCalls = []int{1, 8, 64, 512}
140	defaultReqSizeBytes       = []int{1, 1024, 1024 * 1024}
141	defaultRespSizeBytes      = []int{1, 1024, 1024 * 1024}
142	networks                  = map[string]latency.Network{
143		networkModeLocal: latency.Local,
144		networkModeLAN:   latency.LAN,
145		networkModeWAN:   latency.WAN,
146		networkLongHaul:  latency.Longhaul,
147	}
148	keepaliveTime    = 10 * time.Second
149	keepaliveTimeout = 1 * time.Second
150	// This is 0.8*keepaliveTime to prevent connection issues because of server
151	// keepalive enforcement.
152	keepaliveMinTime = 8 * time.Second
153)
154
155// runModes indicates the workloads to run. This is initialized with a call to
156// `runModesFromWorkloads`, passing the workloads flag set by the user.
157type runModes struct {
158	unary, streaming, unconstrained bool
159}
160
161// runModesFromWorkloads determines the runModes based on the value of
162// workloads flag set by the user.
163func runModesFromWorkloads(workload string) runModes {
164	r := runModes{}
165	switch workload {
166	case workloadsUnary:
167		r.unary = true
168	case workloadsStreaming:
169		r.streaming = true
170	case workloadsUnconstrained:
171		r.unconstrained = true
172	case workloadsAll:
173		r.unary = true
174		r.streaming = true
175		r.unconstrained = true
176	default:
177		log.Fatalf("Unknown workloads setting: %v (want one of: %v)",
178			workloads, strings.Join(allWorkloads, ", "))
179	}
180	return r
181}
182
183type startFunc func(mode string, bf stats.Features)
184type stopFunc func(count uint64)
185type ucStopFunc func(req uint64, resp uint64)
186type rpcCallFunc func(pos int)
187type rpcSendFunc func(pos int)
188type rpcRecvFunc func(pos int)
189type rpcCleanupFunc func()
190
191func unaryBenchmark(start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats) {
192	caller, cleanup := makeFuncUnary(bf)
193	defer cleanup()
194	runBenchmark(caller, start, stop, bf, s, workloadsUnary)
195}
196
197func streamBenchmark(start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats) {
198	caller, cleanup := makeFuncStream(bf)
199	defer cleanup()
200	runBenchmark(caller, start, stop, bf, s, workloadsStreaming)
201}
202
203func unconstrainedStreamBenchmark(start startFunc, stop ucStopFunc, bf stats.Features, s *stats.Stats) {
204	var sender rpcSendFunc
205	var recver rpcRecvFunc
206	var cleanup rpcCleanupFunc
207	if bf.EnablePreloader {
208		sender, recver, cleanup = makeFuncUnconstrainedStreamPreloaded(bf)
209	} else {
210		sender, recver, cleanup = makeFuncUnconstrainedStream(bf)
211	}
212	defer cleanup()
213
214	var req, resp uint64
215	go func() {
216		// Resets the counters once warmed up
217		<-time.NewTimer(warmuptime).C
218		atomic.StoreUint64(&req, 0)
219		atomic.StoreUint64(&resp, 0)
220		start(workloadsUnconstrained, bf)
221	}()
222
223	bmEnd := time.Now().Add(bf.BenchTime + warmuptime)
224	var wg sync.WaitGroup
225	wg.Add(2 * bf.MaxConcurrentCalls)
226	for i := 0; i < bf.MaxConcurrentCalls; i++ {
227		go func(pos int) {
228			defer wg.Done()
229			for {
230				t := time.Now()
231				if t.After(bmEnd) {
232					return
233				}
234				sender(pos)
235				atomic.AddUint64(&req, 1)
236			}
237		}(i)
238		go func(pos int) {
239			defer wg.Done()
240			for {
241				t := time.Now()
242				if t.After(bmEnd) {
243					return
244				}
245				recver(pos)
246				atomic.AddUint64(&resp, 1)
247			}
248		}(i)
249	}
250	wg.Wait()
251	stop(req, resp)
252}
253
254// makeClient returns a gRPC client for the grpc.testing.BenchmarkService
255// service. The client is configured using the different options in the passed
256// 'bf'. Also returns a cleanup function to close the client and release
257// resources.
258func makeClient(bf stats.Features) (testpb.BenchmarkServiceClient, func()) {
259	nw := &latency.Network{Kbps: bf.Kbps, Latency: bf.Latency, MTU: bf.MTU}
260	opts := []grpc.DialOption{}
261	sopts := []grpc.ServerOption{}
262	if bf.ModeCompressor == compModeNop {
263		sopts = append(sopts,
264			grpc.RPCCompressor(nopCompressor{}),
265			grpc.RPCDecompressor(nopDecompressor{}),
266		)
267		opts = append(opts,
268			grpc.WithCompressor(nopCompressor{}),
269			grpc.WithDecompressor(nopDecompressor{}),
270		)
271	}
272	if bf.ModeCompressor == compModeGzip {
273		sopts = append(sopts,
274			grpc.RPCCompressor(grpc.NewGZIPCompressor()),
275			grpc.RPCDecompressor(grpc.NewGZIPDecompressor()),
276		)
277		opts = append(opts,
278			grpc.WithCompressor(grpc.NewGZIPCompressor()),
279			grpc.WithDecompressor(grpc.NewGZIPDecompressor()),
280		)
281	}
282	if bf.EnableKeepalive {
283		sopts = append(sopts,
284			grpc.KeepaliveParams(keepalive.ServerParameters{
285				Time:    keepaliveTime,
286				Timeout: keepaliveTimeout,
287			}),
288			grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
289				MinTime:             keepaliveMinTime,
290				PermitWithoutStream: true,
291			}),
292		)
293		opts = append(opts,
294			grpc.WithKeepaliveParams(keepalive.ClientParameters{
295				Time:                keepaliveTime,
296				Timeout:             keepaliveTimeout,
297				PermitWithoutStream: true,
298			}),
299		)
300	}
301	sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(bf.MaxConcurrentCalls+1)))
302	opts = append(opts, grpc.WithInsecure())
303
304	var lis net.Listener
305	if bf.UseBufConn {
306		bcLis := bufconn.Listen(256 * 1024)
307		lis = bcLis
308		opts = append(opts, grpc.WithContextDialer(func(ctx context.Context, address string) (net.Conn, error) {
309			return nw.ContextDialer(func(context.Context, string, string) (net.Conn, error) {
310				return bcLis.Dial()
311			})(ctx, "", "")
312		}))
313	} else {
314		var err error
315		lis, err = net.Listen("tcp", "localhost:0")
316		if err != nil {
317			grpclog.Fatalf("Failed to listen: %v", err)
318		}
319		opts = append(opts, grpc.WithContextDialer(func(ctx context.Context, address string) (net.Conn, error) {
320			return nw.ContextDialer((&net.Dialer{}).DialContext)(ctx, "tcp", lis.Addr().String())
321		}))
322	}
323	lis = nw.Listener(lis)
324	stopper := bm.StartServer(bm.ServerInfo{Type: "protobuf", Listener: lis}, sopts...)
325	conn := bm.NewClientConn("" /* target not used */, opts...)
326	return testpb.NewBenchmarkServiceClient(conn), func() {
327		conn.Close()
328		stopper()
329	}
330}
331
332func makeFuncUnary(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
333	tc, cleanup := makeClient(bf)
334	return func(int) {
335		reqSizeBytes := bf.ReqSizeBytes
336		respSizeBytes := bf.RespSizeBytes
337		if bf.ReqPayloadCurve != nil {
338			reqSizeBytes = bf.ReqPayloadCurve.ChooseRandom()
339		}
340		if bf.RespPayloadCurve != nil {
341			respSizeBytes = bf.RespPayloadCurve.ChooseRandom()
342		}
343		unaryCaller(tc, reqSizeBytes, respSizeBytes)
344	}, cleanup
345}
346
347func makeFuncStream(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
348	tc, cleanup := makeClient(bf)
349
350	streams := make([]testpb.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls)
351	for i := 0; i < bf.MaxConcurrentCalls; i++ {
352		stream, err := tc.StreamingCall(context.Background())
353		if err != nil {
354			grpclog.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
355		}
356		streams[i] = stream
357	}
358
359	return func(pos int) {
360		reqSizeBytes := bf.ReqSizeBytes
361		respSizeBytes := bf.RespSizeBytes
362		if bf.ReqPayloadCurve != nil {
363			reqSizeBytes = bf.ReqPayloadCurve.ChooseRandom()
364		}
365		if bf.RespPayloadCurve != nil {
366			respSizeBytes = bf.RespPayloadCurve.ChooseRandom()
367		}
368		streamCaller(streams[pos], reqSizeBytes, respSizeBytes)
369	}, cleanup
370}
371
372func makeFuncUnconstrainedStreamPreloaded(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) {
373	streams, req, cleanup := setupUnconstrainedStream(bf)
374
375	preparedMsg := make([]*grpc.PreparedMsg, len(streams))
376	for i, stream := range streams {
377		preparedMsg[i] = &grpc.PreparedMsg{}
378		err := preparedMsg[i].Encode(stream, req)
379		if err != nil {
380			grpclog.Fatalf("%v.Encode(%v, %v) = %v", preparedMsg[i], req, stream, err)
381		}
382	}
383
384	return func(pos int) {
385			streams[pos].SendMsg(preparedMsg[pos])
386		}, func(pos int) {
387			streams[pos].Recv()
388		}, cleanup
389}
390
391func makeFuncUnconstrainedStream(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) {
392	streams, req, cleanup := setupUnconstrainedStream(bf)
393
394	return func(pos int) {
395			streams[pos].Send(req)
396		}, func(pos int) {
397			streams[pos].Recv()
398		}, cleanup
399}
400
401func setupUnconstrainedStream(bf stats.Features) ([]testpb.BenchmarkService_StreamingCallClient, *testpb.SimpleRequest, rpcCleanupFunc) {
402	tc, cleanup := makeClient(bf)
403
404	streams := make([]testpb.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls)
405	for i := 0; i < bf.MaxConcurrentCalls; i++ {
406		stream, err := tc.UnconstrainedStreamingCall(context.Background())
407		if err != nil {
408			grpclog.Fatalf("%v.UnconstrainedStreamingCall(_) = _, %v", tc, err)
409		}
410		streams[i] = stream
411	}
412
413	pl := bm.NewPayload(testpb.PayloadType_COMPRESSABLE, bf.ReqSizeBytes)
414	req := &testpb.SimpleRequest{
415		ResponseType: pl.Type,
416		ResponseSize: int32(bf.RespSizeBytes),
417		Payload:      pl,
418	}
419
420	return streams, req, cleanup
421}
422
423// Makes a UnaryCall gRPC request using the given BenchmarkServiceClient and
424// request and response sizes.
425func unaryCaller(client testpb.BenchmarkServiceClient, reqSize, respSize int) {
426	if err := bm.DoUnaryCall(client, reqSize, respSize); err != nil {
427		grpclog.Fatalf("DoUnaryCall failed: %v", err)
428	}
429}
430
431func streamCaller(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) {
432	if err := bm.DoStreamingRoundTrip(stream, reqSize, respSize); err != nil {
433		grpclog.Fatalf("DoStreamingRoundTrip failed: %v", err)
434	}
435}
436
437func runBenchmark(caller rpcCallFunc, start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats, mode string) {
438	// Warm up connection.
439	for i := 0; i < warmupCallCount; i++ {
440		caller(0)
441	}
442
443	// Run benchmark.
444	start(mode, bf)
445	var wg sync.WaitGroup
446	wg.Add(bf.MaxConcurrentCalls)
447	bmEnd := time.Now().Add(bf.BenchTime)
448	var count uint64
449	for i := 0; i < bf.MaxConcurrentCalls; i++ {
450		go func(pos int) {
451			defer wg.Done()
452			for {
453				t := time.Now()
454				if t.After(bmEnd) {
455					return
456				}
457				start := time.Now()
458				caller(pos)
459				elapse := time.Since(start)
460				atomic.AddUint64(&count, 1)
461				s.AddDuration(elapse)
462			}
463		}(i)
464	}
465	wg.Wait()
466	stop(count)
467}
468
469// benchOpts represents all configurable options available while running this
470// benchmark. This is built from the values passed as flags.
471type benchOpts struct {
472	rModes              runModes
473	benchTime           time.Duration
474	memProfileRate      int
475	memProfile          string
476	cpuProfile          string
477	networkMode         string
478	benchmarkResultFile string
479	useBufconn          bool
480	enableKeepalive     bool
481	features            *featureOpts
482}
483
484// featureOpts represents options which can have multiple values. The user
485// usually provides a comma-separated list of options for each of these
486// features through command line flags. We generate all possible combinations
487// for the provided values and run the benchmarks for each combination.
488type featureOpts struct {
489	enableTrace        []bool
490	readLatencies      []time.Duration
491	readKbps           []int
492	readMTU            []int
493	maxConcurrentCalls []int
494	reqSizeBytes       []int
495	respSizeBytes      []int
496	reqPayloadCurves   []*stats.PayloadCurve
497	respPayloadCurves  []*stats.PayloadCurve
498	compModes          []string
499	enableChannelz     []bool
500	enablePreloader    []bool
501}
502
503// makeFeaturesNum returns a slice of ints of size 'maxFeatureIndex' where each
504// element of the slice (indexed by 'featuresIndex' enum) contains the number
505// of features to be exercised by the benchmark code.
506// For example: Index 0 of the returned slice contains the number of values for
507// enableTrace feature, while index 1 contains the number of value of
508// readLatencies feature and so on.
509func makeFeaturesNum(b *benchOpts) []int {
510	featuresNum := make([]int, stats.MaxFeatureIndex)
511	for i := 0; i < len(featuresNum); i++ {
512		switch stats.FeatureIndex(i) {
513		case stats.EnableTraceIndex:
514			featuresNum[i] = len(b.features.enableTrace)
515		case stats.ReadLatenciesIndex:
516			featuresNum[i] = len(b.features.readLatencies)
517		case stats.ReadKbpsIndex:
518			featuresNum[i] = len(b.features.readKbps)
519		case stats.ReadMTUIndex:
520			featuresNum[i] = len(b.features.readMTU)
521		case stats.MaxConcurrentCallsIndex:
522			featuresNum[i] = len(b.features.maxConcurrentCalls)
523		case stats.ReqSizeBytesIndex:
524			featuresNum[i] = len(b.features.reqSizeBytes)
525		case stats.RespSizeBytesIndex:
526			featuresNum[i] = len(b.features.respSizeBytes)
527		case stats.ReqPayloadCurveIndex:
528			featuresNum[i] = len(b.features.reqPayloadCurves)
529		case stats.RespPayloadCurveIndex:
530			featuresNum[i] = len(b.features.respPayloadCurves)
531		case stats.CompModesIndex:
532			featuresNum[i] = len(b.features.compModes)
533		case stats.EnableChannelzIndex:
534			featuresNum[i] = len(b.features.enableChannelz)
535		case stats.EnablePreloaderIndex:
536			featuresNum[i] = len(b.features.enablePreloader)
537		default:
538			log.Fatalf("Unknown feature index %v in generateFeatures. maxFeatureIndex is %v", i, stats.MaxFeatureIndex)
539		}
540	}
541	return featuresNum
542}
543
544// sharedFeatures returns a bool slice which acts as a bitmask. Each item in
545// the slice represents a feature, indexed by 'featureIndex' enum.  The bit is
546// set to 1 if the corresponding feature does not have multiple value, so is
547// shared amongst all benchmarks.
548func sharedFeatures(featuresNum []int) []bool {
549	result := make([]bool, len(featuresNum))
550	for i, num := range featuresNum {
551		if num <= 1 {
552			result[i] = true
553		}
554	}
555	return result
556}
557
558// generateFeatures generates all combinations of the provided feature options.
559// While all the feature options are stored in the benchOpts struct, the input
560// parameter 'featuresNum' is a slice indexed by 'featureIndex' enum containing
561// the number of values for each feature.
562// For example, let's say the user sets -workloads=all and
563// -maxConcurrentCalls=1,100, this would end up with the following
564// combinations:
565// [workloads: unary, maxConcurrentCalls=1]
566// [workloads: unary, maxConcurrentCalls=1]
567// [workloads: streaming, maxConcurrentCalls=100]
568// [workloads: streaming, maxConcurrentCalls=100]
569// [workloads: unconstrained, maxConcurrentCalls=1]
570// [workloads: unconstrained, maxConcurrentCalls=100]
571func (b *benchOpts) generateFeatures(featuresNum []int) []stats.Features {
572	// curPos and initialPos are two slices where each value acts as an index
573	// into the appropriate feature slice maintained in benchOpts.features. This
574	// loop generates all possible combinations of features by changing one value
575	// at a time, and once curPos becomes equal to initialPos, we have explored
576	// all options.
577	var result []stats.Features
578	var curPos []int
579	initialPos := make([]int, stats.MaxFeatureIndex)
580	for !reflect.DeepEqual(initialPos, curPos) {
581		if curPos == nil {
582			curPos = make([]int, stats.MaxFeatureIndex)
583		}
584		f := stats.Features{
585			// These features stay the same for each iteration.
586			NetworkMode:     b.networkMode,
587			UseBufConn:      b.useBufconn,
588			EnableKeepalive: b.enableKeepalive,
589			BenchTime:       b.benchTime,
590			// These features can potentially change for each iteration.
591			EnableTrace:        b.features.enableTrace[curPos[stats.EnableTraceIndex]],
592			Latency:            b.features.readLatencies[curPos[stats.ReadLatenciesIndex]],
593			Kbps:               b.features.readKbps[curPos[stats.ReadKbpsIndex]],
594			MTU:                b.features.readMTU[curPos[stats.ReadMTUIndex]],
595			MaxConcurrentCalls: b.features.maxConcurrentCalls[curPos[stats.MaxConcurrentCallsIndex]],
596			ModeCompressor:     b.features.compModes[curPos[stats.CompModesIndex]],
597			EnableChannelz:     b.features.enableChannelz[curPos[stats.EnableChannelzIndex]],
598			EnablePreloader:    b.features.enablePreloader[curPos[stats.EnablePreloaderIndex]],
599		}
600		if len(b.features.reqPayloadCurves) == 0 {
601			f.ReqSizeBytes = b.features.reqSizeBytes[curPos[stats.ReqSizeBytesIndex]]
602		} else {
603			f.ReqPayloadCurve = b.features.reqPayloadCurves[curPos[stats.ReqPayloadCurveIndex]]
604		}
605		if len(b.features.respPayloadCurves) == 0 {
606			f.RespSizeBytes = b.features.respSizeBytes[curPos[stats.RespSizeBytesIndex]]
607		} else {
608			f.RespPayloadCurve = b.features.respPayloadCurves[curPos[stats.RespPayloadCurveIndex]]
609		}
610		result = append(result, f)
611		addOne(curPos, featuresNum)
612	}
613	return result
614}
615
616// addOne mutates the input slice 'features' by changing one feature, thus
617// arriving at the next combination of feature values. 'featuresMaxPosition'
618// provides the numbers of allowed values for each feature, indexed by
619// 'featureIndex' enum.
620func addOne(features []int, featuresMaxPosition []int) {
621	for i := len(features) - 1; i >= 0; i-- {
622		if featuresMaxPosition[i] == 0 {
623			continue
624		}
625		features[i] = (features[i] + 1)
626		if features[i]/featuresMaxPosition[i] == 0 {
627			break
628		}
629		features[i] = features[i] % featuresMaxPosition[i]
630	}
631}
632
633// processFlags reads the command line flags and builds benchOpts. Specifying
634// invalid values for certain flags will cause flag.Parse() to fail, and the
635// program to terminate.
636// This *SHOULD* be the only place where the flags are accessed. All other
637// parts of the benchmark code should rely on the returned benchOpts.
638func processFlags() *benchOpts {
639	flag.Parse()
640	if flag.NArg() != 0 {
641		log.Fatal("Error: unparsed arguments: ", flag.Args())
642	}
643
644	opts := &benchOpts{
645		rModes:              runModesFromWorkloads(*workloads),
646		benchTime:           *benchTime,
647		memProfileRate:      *memProfileRate,
648		memProfile:          *memProfile,
649		cpuProfile:          *cpuProfile,
650		networkMode:         *networkMode,
651		benchmarkResultFile: *benchmarkResultFile,
652		useBufconn:          *useBufconn,
653		enableKeepalive:     *enableKeepalive,
654		features: &featureOpts{
655			enableTrace:        setToggleMode(*traceMode),
656			readLatencies:      append([]time.Duration(nil), *readLatency...),
657			readKbps:           append([]int(nil), *readKbps...),
658			readMTU:            append([]int(nil), *readMTU...),
659			maxConcurrentCalls: append([]int(nil), *maxConcurrentCalls...),
660			reqSizeBytes:       append([]int(nil), *readReqSizeBytes...),
661			respSizeBytes:      append([]int(nil), *readRespSizeBytes...),
662			compModes:          setCompressorMode(*compressorMode),
663			enableChannelz:     setToggleMode(*channelzOn),
664			enablePreloader:    setToggleMode(*preloaderMode),
665		},
666	}
667
668	if len(*reqPayloadCurveFiles) == 0 {
669		if len(opts.features.reqSizeBytes) == 0 {
670			opts.features.reqSizeBytes = defaultReqSizeBytes
671		}
672	} else {
673		if len(opts.features.reqSizeBytes) != 0 {
674			log.Fatalf("you may not specify -reqPayloadCurveFiles and -reqSizeBytes at the same time")
675		}
676		for _, file := range *reqPayloadCurveFiles {
677			pc, err := stats.NewPayloadCurve(file)
678			if err != nil {
679				log.Fatalf("cannot load payload curve file %s: %v", file, err)
680			}
681			opts.features.reqPayloadCurves = append(opts.features.reqPayloadCurves, pc)
682		}
683		opts.features.reqSizeBytes = nil
684	}
685	if len(*respPayloadCurveFiles) == 0 {
686		if len(opts.features.respSizeBytes) == 0 {
687			opts.features.respSizeBytes = defaultRespSizeBytes
688		}
689	} else {
690		if len(opts.features.respSizeBytes) != 0 {
691			log.Fatalf("you may not specify -respPayloadCurveFiles and -respSizeBytes at the same time")
692		}
693		for _, file := range *respPayloadCurveFiles {
694			pc, err := stats.NewPayloadCurve(file)
695			if err != nil {
696				log.Fatalf("cannot load payload curve file %s: %v", file, err)
697			}
698			opts.features.respPayloadCurves = append(opts.features.respPayloadCurves, pc)
699		}
700		opts.features.respSizeBytes = nil
701	}
702
703	// Re-write latency, kpbs and mtu if network mode is set.
704	if network, ok := networks[opts.networkMode]; ok {
705		opts.features.readLatencies = []time.Duration{network.Latency}
706		opts.features.readKbps = []int{network.Kbps}
707		opts.features.readMTU = []int{network.MTU}
708	}
709	return opts
710}
711
712func setToggleMode(val string) []bool {
713	switch val {
714	case toggleModeOn:
715		return []bool{true}
716	case toggleModeOff:
717		return []bool{false}
718	case toggleModeBoth:
719		return []bool{false, true}
720	default:
721		// This should never happen because a wrong value passed to this flag would
722		// be caught during flag.Parse().
723		return []bool{}
724	}
725}
726
727func setCompressorMode(val string) []string {
728	switch val {
729	case compModeNop, compModeGzip, compModeOff:
730		return []string{val}
731	case compModeAll:
732		return []string{compModeNop, compModeGzip, compModeOff}
733	default:
734		// This should never happen because a wrong value passed to this flag would
735		// be caught during flag.Parse().
736		return []string{}
737	}
738}
739
740func main() {
741	opts := processFlags()
742	before(opts)
743
744	s := stats.NewStats(numStatsBuckets)
745	featuresNum := makeFeaturesNum(opts)
746	sf := sharedFeatures(featuresNum)
747
748	var (
749		start  = func(mode string, bf stats.Features) { s.StartRun(mode, bf, sf) }
750		stop   = func(count uint64) { s.EndRun(count) }
751		ucStop = func(req uint64, resp uint64) { s.EndUnconstrainedRun(req, resp) }
752	)
753
754	for _, bf := range opts.generateFeatures(featuresNum) {
755		grpc.EnableTracing = bf.EnableTrace
756		if bf.EnableChannelz {
757			channelz.TurnOn()
758		}
759		if opts.rModes.unary {
760			unaryBenchmark(start, stop, bf, s)
761		}
762		if opts.rModes.streaming {
763			streamBenchmark(start, stop, bf, s)
764		}
765		if opts.rModes.unconstrained {
766			unconstrainedStreamBenchmark(start, ucStop, bf, s)
767		}
768	}
769	after(opts, s.GetResults())
770}
771
772func before(opts *benchOpts) {
773	if opts.memProfile != "" {
774		runtime.MemProfileRate = opts.memProfileRate
775	}
776	if opts.cpuProfile != "" {
777		f, err := os.Create(opts.cpuProfile)
778		if err != nil {
779			fmt.Fprintf(os.Stderr, "testing: %s\n", err)
780			return
781		}
782		if err := pprof.StartCPUProfile(f); err != nil {
783			fmt.Fprintf(os.Stderr, "testing: can't start cpu profile: %s\n", err)
784			f.Close()
785			return
786		}
787	}
788}
789
790func after(opts *benchOpts, data []stats.BenchResults) {
791	if opts.cpuProfile != "" {
792		pprof.StopCPUProfile() // flushes profile to disk
793	}
794	if opts.memProfile != "" {
795		f, err := os.Create(opts.memProfile)
796		if err != nil {
797			fmt.Fprintf(os.Stderr, "testing: %s\n", err)
798			os.Exit(2)
799		}
800		runtime.GC() // materialize all statistics
801		if err = pprof.WriteHeapProfile(f); err != nil {
802			fmt.Fprintf(os.Stderr, "testing: can't write heap profile %s: %s\n", opts.memProfile, err)
803			os.Exit(2)
804		}
805		f.Close()
806	}
807	if opts.benchmarkResultFile != "" {
808		f, err := os.Create(opts.benchmarkResultFile)
809		if err != nil {
810			log.Fatalf("testing: can't write benchmark result %s: %s\n", opts.benchmarkResultFile, err)
811		}
812		dataEncoder := gob.NewEncoder(f)
813		dataEncoder.Encode(data)
814		f.Close()
815	}
816}
817
818// nopCompressor is a compressor that just copies data.
819type nopCompressor struct{}
820
821func (nopCompressor) Do(w io.Writer, p []byte) error {
822	n, err := w.Write(p)
823	if err != nil {
824		return err
825	}
826	if n != len(p) {
827		return fmt.Errorf("nopCompressor.Write: wrote %v bytes; want %v", n, len(p))
828	}
829	return nil
830}
831
832func (nopCompressor) Type() string { return compModeNop }
833
834// nopDecompressor is a decompressor that just copies data.
835type nopDecompressor struct{}
836
837func (nopDecompressor) Do(r io.Reader) ([]byte, error) { return ioutil.ReadAll(r) }
838func (nopDecompressor) Type() string                   { return compModeNop }
839