1/* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19/* 20Package main provides benchmark with setting flags. 21 22An example to run some benchmarks with profiling enabled: 23 24go run benchmark/benchmain/main.go -benchtime=10s -workloads=all \ 25 -compression=gzip -maxConcurrentCalls=1 -trace=off \ 26 -reqSizeBytes=1,1048576 -respSizeBytes=1,1048576 -networkMode=Local \ 27 -cpuProfile=cpuProf -memProfile=memProf -memProfileRate=10000 -resultFile=result 28 29As a suggestion, when creating a branch, you can run this benchmark and save the result 30file "-resultFile=basePerf", and later when you at the middle of the work or finish the 31work, you can get the benchmark result and compare it with the base anytime. 32 33Assume there are two result files names as "basePerf" and "curPerf" created by adding 34-resultFile=basePerf and -resultFile=curPerf. 35 To format the curPerf, run: 36 go run benchmark/benchresult/main.go curPerf 37 To observe how the performance changes based on a base result, run: 38 go run benchmark/benchresult/main.go basePerf curPerf 39*/ 40package main 41 42import ( 43 "context" 44 "encoding/gob" 45 "flag" 46 "fmt" 47 "io" 48 "io/ioutil" 49 "log" 50 "net" 51 "os" 52 "reflect" 53 "runtime" 54 "runtime/pprof" 55 "strings" 56 "sync" 57 "sync/atomic" 58 "time" 59 60 "google.golang.org/grpc" 61 bm "google.golang.org/grpc/benchmark" 62 "google.golang.org/grpc/benchmark/flags" 63 testpb "google.golang.org/grpc/benchmark/grpc_testing" 64 "google.golang.org/grpc/benchmark/latency" 65 "google.golang.org/grpc/benchmark/stats" 66 "google.golang.org/grpc/grpclog" 67 "google.golang.org/grpc/internal/channelz" 68 "google.golang.org/grpc/keepalive" 69 "google.golang.org/grpc/test/bufconn" 70) 71 72var ( 73 workloads = flags.StringWithAllowedValues("workloads", workloadsAll, 74 fmt.Sprintf("Workloads to execute - One of: %v", strings.Join(allWorkloads, ", ")), allWorkloads) 75 traceMode = flags.StringWithAllowedValues("trace", toggleModeOff, 76 fmt.Sprintf("Trace mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes) 77 preloaderMode = flags.StringWithAllowedValues("preloader", toggleModeOff, 78 fmt.Sprintf("Preloader mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes) 79 channelzOn = flags.StringWithAllowedValues("channelz", toggleModeOff, 80 fmt.Sprintf("Channelz mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes) 81 compressorMode = flags.StringWithAllowedValues("compression", compModeOff, 82 fmt.Sprintf("Compression mode - One of: %v", strings.Join(allCompModes, ", ")), allCompModes) 83 networkMode = flags.StringWithAllowedValues("networkMode", networkModeNone, 84 "Network mode includes LAN, WAN, Local and Longhaul", allNetworkModes) 85 readLatency = flags.DurationSlice("latency", defaultReadLatency, "Simulated one-way network latency - may be a comma-separated list") 86 readKbps = flags.IntSlice("kbps", defaultReadKbps, "Simulated network throughput (in kbps) - may be a comma-separated list") 87 readMTU = flags.IntSlice("mtu", defaultReadMTU, "Simulated network MTU (Maximum Transmission Unit) - may be a comma-separated list") 88 maxConcurrentCalls = flags.IntSlice("maxConcurrentCalls", defaultMaxConcurrentCalls, "Number of concurrent RPCs during benchmarks") 89 readReqSizeBytes = flags.IntSlice("reqSizeBytes", defaultReqSizeBytes, "Request size in bytes - may be a comma-separated list") 90 readRespSizeBytes = flags.IntSlice("respSizeBytes", defaultRespSizeBytes, "Response size in bytes - may be a comma-separated list") 91 benchTime = flag.Duration("benchtime", time.Second, "Configures the amount of time to run each benchmark") 92 memProfile = flag.String("memProfile", "", "Enables memory profiling output to the filename provided.") 93 memProfileRate = flag.Int("memProfileRate", 512*1024, "Configures the memory profiling rate. \n"+ 94 "memProfile should be set before setting profile rate. To include every allocated block in the profile, "+ 95 "set MemProfileRate to 1. To turn off profiling entirely, set MemProfileRate to 0. 512 * 1024 by default.") 96 cpuProfile = flag.String("cpuProfile", "", "Enables CPU profiling output to the filename provided") 97 benchmarkResultFile = flag.String("resultFile", "", "Save the benchmark result into a binary file") 98 useBufconn = flag.Bool("bufconn", false, "Use in-memory connection instead of system network I/O") 99 enableKeepalive = flag.Bool("enable_keepalive", false, "Enable client keepalive. \n"+ 100 "Keepalive.Time is set to 10s, Keepalive.Timeout is set to 1s, Keepalive.PermitWithoutStream is set to true.") 101) 102 103const ( 104 workloadsUnary = "unary" 105 workloadsStreaming = "streaming" 106 workloadsUnconstrained = "unconstrained" 107 workloadsAll = "all" 108 // Compression modes. 109 compModeOff = "off" 110 compModeGzip = "gzip" 111 compModeNop = "nop" 112 compModeAll = "all" 113 // Toggle modes. 114 toggleModeOff = "off" 115 toggleModeOn = "on" 116 toggleModeBoth = "both" 117 // Network modes. 118 networkModeNone = "none" 119 networkModeLocal = "Local" 120 networkModeLAN = "LAN" 121 networkModeWAN = "WAN" 122 networkLongHaul = "Longhaul" 123 124 numStatsBuckets = 10 125 warmupCallCount = 10 126 warmuptime = time.Second 127) 128 129var ( 130 allWorkloads = []string{workloadsUnary, workloadsStreaming, workloadsUnconstrained, workloadsAll} 131 allCompModes = []string{compModeOff, compModeGzip, compModeNop, compModeAll} 132 allToggleModes = []string{toggleModeOff, toggleModeOn, toggleModeBoth} 133 allNetworkModes = []string{networkModeNone, networkModeLocal, networkModeLAN, networkModeWAN, networkLongHaul} 134 defaultReadLatency = []time.Duration{0, 40 * time.Millisecond} // if non-positive, no delay. 135 defaultReadKbps = []int{0, 10240} // if non-positive, infinite 136 defaultReadMTU = []int{0} // if non-positive, infinite 137 defaultMaxConcurrentCalls = []int{1, 8, 64, 512} 138 defaultReqSizeBytes = []int{1, 1024, 1024 * 1024} 139 defaultRespSizeBytes = []int{1, 1024, 1024 * 1024} 140 networks = map[string]latency.Network{ 141 networkModeLocal: latency.Local, 142 networkModeLAN: latency.LAN, 143 networkModeWAN: latency.WAN, 144 networkLongHaul: latency.Longhaul, 145 } 146 keepaliveTime = 10 * time.Second 147 keepaliveTimeout = 1 * time.Second 148 // This is 0.8*keepaliveTime to prevent connection issues because of server 149 // keepalive enforcement. 150 keepaliveMinTime = 8 * time.Second 151) 152 153// runModes indicates the workloads to run. This is initialized with a call to 154// `runModesFromWorkloads`, passing the workloads flag set by the user. 155type runModes struct { 156 unary, streaming, unconstrained bool 157} 158 159// runModesFromWorkloads determines the runModes based on the value of 160// workloads flag set by the user. 161func runModesFromWorkloads(workload string) runModes { 162 r := runModes{} 163 switch workload { 164 case workloadsUnary: 165 r.unary = true 166 case workloadsStreaming: 167 r.streaming = true 168 case workloadsUnconstrained: 169 r.unconstrained = true 170 case workloadsAll: 171 r.unary = true 172 r.streaming = true 173 r.unconstrained = true 174 default: 175 log.Fatalf("Unknown workloads setting: %v (want one of: %v)", 176 workloads, strings.Join(allWorkloads, ", ")) 177 } 178 return r 179} 180 181type startFunc func(mode string, bf stats.Features) 182type stopFunc func(count uint64) 183type ucStopFunc func(req uint64, resp uint64) 184type rpcCallFunc func(pos int) 185type rpcSendFunc func(pos int) 186type rpcRecvFunc func(pos int) 187type rpcCleanupFunc func() 188 189func unaryBenchmark(start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats) { 190 caller, cleanup := makeFuncUnary(bf) 191 defer cleanup() 192 runBenchmark(caller, start, stop, bf, s, workloadsUnary) 193} 194 195func streamBenchmark(start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats) { 196 caller, cleanup := makeFuncStream(bf) 197 defer cleanup() 198 runBenchmark(caller, start, stop, bf, s, workloadsStreaming) 199} 200 201func unconstrainedStreamBenchmark(start startFunc, stop ucStopFunc, bf stats.Features, s *stats.Stats) { 202 var sender rpcSendFunc 203 var recver rpcRecvFunc 204 var cleanup rpcCleanupFunc 205 if bf.EnablePreloader { 206 sender, recver, cleanup = makeFuncUnconstrainedStreamPreloaded(bf) 207 } else { 208 sender, recver, cleanup = makeFuncUnconstrainedStream(bf) 209 } 210 defer cleanup() 211 212 var req, resp uint64 213 go func() { 214 // Resets the counters once warmed up 215 <-time.NewTimer(warmuptime).C 216 atomic.StoreUint64(&req, 0) 217 atomic.StoreUint64(&resp, 0) 218 start(workloadsUnconstrained, bf) 219 }() 220 221 bmEnd := time.Now().Add(bf.BenchTime + warmuptime) 222 var wg sync.WaitGroup 223 wg.Add(2 * bf.MaxConcurrentCalls) 224 for i := 0; i < bf.MaxConcurrentCalls; i++ { 225 go func(pos int) { 226 defer wg.Done() 227 for { 228 t := time.Now() 229 if t.After(bmEnd) { 230 return 231 } 232 sender(pos) 233 atomic.AddUint64(&req, 1) 234 } 235 }(i) 236 go func(pos int) { 237 defer wg.Done() 238 for { 239 t := time.Now() 240 if t.After(bmEnd) { 241 return 242 } 243 recver(pos) 244 atomic.AddUint64(&resp, 1) 245 } 246 }(i) 247 } 248 wg.Wait() 249 stop(req, resp) 250} 251 252// makeClient returns a gRPC client for the grpc.testing.BenchmarkService 253// service. The client is configured using the different options in the passed 254// 'bf'. Also returns a cleanup function to close the client and release 255// resources. 256func makeClient(bf stats.Features) (testpb.BenchmarkServiceClient, func()) { 257 nw := &latency.Network{Kbps: bf.Kbps, Latency: bf.Latency, MTU: bf.MTU} 258 opts := []grpc.DialOption{} 259 sopts := []grpc.ServerOption{} 260 if bf.ModeCompressor == compModeNop { 261 sopts = append(sopts, 262 grpc.RPCCompressor(nopCompressor{}), 263 grpc.RPCDecompressor(nopDecompressor{}), 264 ) 265 opts = append(opts, 266 grpc.WithCompressor(nopCompressor{}), 267 grpc.WithDecompressor(nopDecompressor{}), 268 ) 269 } 270 if bf.ModeCompressor == compModeGzip { 271 sopts = append(sopts, 272 grpc.RPCCompressor(grpc.NewGZIPCompressor()), 273 grpc.RPCDecompressor(grpc.NewGZIPDecompressor()), 274 ) 275 opts = append(opts, 276 grpc.WithCompressor(grpc.NewGZIPCompressor()), 277 grpc.WithDecompressor(grpc.NewGZIPDecompressor()), 278 ) 279 } 280 if bf.EnableKeepalive { 281 sopts = append(sopts, 282 grpc.KeepaliveParams(keepalive.ServerParameters{ 283 Time: keepaliveTime, 284 Timeout: keepaliveTimeout, 285 }), 286 grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ 287 MinTime: keepaliveMinTime, 288 PermitWithoutStream: true, 289 }), 290 ) 291 opts = append(opts, 292 grpc.WithKeepaliveParams(keepalive.ClientParameters{ 293 Time: keepaliveTime, 294 Timeout: keepaliveTimeout, 295 PermitWithoutStream: true, 296 }), 297 ) 298 } 299 sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(bf.MaxConcurrentCalls+1))) 300 opts = append(opts, grpc.WithInsecure()) 301 302 var lis net.Listener 303 if bf.UseBufConn { 304 bcLis := bufconn.Listen(256 * 1024) 305 lis = bcLis 306 opts = append(opts, grpc.WithContextDialer(func(ctx context.Context, address string) (net.Conn, error) { 307 return nw.ContextDialer(func(context.Context, string, string) (net.Conn, error) { 308 return bcLis.Dial() 309 })(ctx, "", "") 310 })) 311 } else { 312 var err error 313 lis, err = net.Listen("tcp", "localhost:0") 314 if err != nil { 315 grpclog.Fatalf("Failed to listen: %v", err) 316 } 317 opts = append(opts, grpc.WithContextDialer(func(ctx context.Context, address string) (net.Conn, error) { 318 return nw.ContextDialer((&net.Dialer{}).DialContext)(ctx, "tcp", lis.Addr().String()) 319 })) 320 } 321 lis = nw.Listener(lis) 322 stopper := bm.StartServer(bm.ServerInfo{Type: "protobuf", Listener: lis}, sopts...) 323 conn := bm.NewClientConn("" /* target not used */, opts...) 324 return testpb.NewBenchmarkServiceClient(conn), func() { 325 conn.Close() 326 stopper() 327 } 328} 329 330func makeFuncUnary(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) { 331 tc, cleanup := makeClient(bf) 332 return func(int) { 333 unaryCaller(tc, bf.ReqSizeBytes, bf.RespSizeBytes) 334 }, cleanup 335} 336 337func makeFuncStream(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) { 338 tc, cleanup := makeClient(bf) 339 340 streams := make([]testpb.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls) 341 for i := 0; i < bf.MaxConcurrentCalls; i++ { 342 stream, err := tc.StreamingCall(context.Background()) 343 if err != nil { 344 grpclog.Fatalf("%v.StreamingCall(_) = _, %v", tc, err) 345 } 346 streams[i] = stream 347 } 348 349 return func(pos int) { 350 streamCaller(streams[pos], bf.ReqSizeBytes, bf.RespSizeBytes) 351 }, cleanup 352} 353 354func makeFuncUnconstrainedStreamPreloaded(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) { 355 streams, req, cleanup := setupUnconstrainedStream(bf) 356 357 preparedMsg := make([]*grpc.PreparedMsg, len(streams)) 358 for i, stream := range streams { 359 preparedMsg[i] = &grpc.PreparedMsg{} 360 err := preparedMsg[i].Encode(stream, req) 361 if err != nil { 362 grpclog.Fatalf("%v.Encode(%v, %v) = %v", preparedMsg[i], req, stream, err) 363 } 364 } 365 366 return func(pos int) { 367 streams[pos].SendMsg(preparedMsg[pos]) 368 }, func(pos int) { 369 streams[pos].Recv() 370 }, cleanup 371} 372 373func makeFuncUnconstrainedStream(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) { 374 streams, req, cleanup := setupUnconstrainedStream(bf) 375 376 return func(pos int) { 377 streams[pos].Send(req) 378 }, func(pos int) { 379 streams[pos].Recv() 380 }, cleanup 381} 382 383func setupUnconstrainedStream(bf stats.Features) ([]testpb.BenchmarkService_StreamingCallClient, *testpb.SimpleRequest, rpcCleanupFunc) { 384 tc, cleanup := makeClient(bf) 385 386 streams := make([]testpb.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls) 387 for i := 0; i < bf.MaxConcurrentCalls; i++ { 388 stream, err := tc.UnconstrainedStreamingCall(context.Background()) 389 if err != nil { 390 grpclog.Fatalf("%v.UnconstrainedStreamingCall(_) = _, %v", tc, err) 391 } 392 streams[i] = stream 393 } 394 395 pl := bm.NewPayload(testpb.PayloadType_COMPRESSABLE, bf.ReqSizeBytes) 396 req := &testpb.SimpleRequest{ 397 ResponseType: pl.Type, 398 ResponseSize: int32(bf.RespSizeBytes), 399 Payload: pl, 400 } 401 402 return streams, req, cleanup 403} 404 405// Makes a UnaryCall gRPC request using the given BenchmarkServiceClient and 406// request and response sizes. 407func unaryCaller(client testpb.BenchmarkServiceClient, reqSize, respSize int) { 408 if err := bm.DoUnaryCall(client, reqSize, respSize); err != nil { 409 grpclog.Fatalf("DoUnaryCall failed: %v", err) 410 } 411} 412 413func streamCaller(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) { 414 if err := bm.DoStreamingRoundTrip(stream, reqSize, respSize); err != nil { 415 grpclog.Fatalf("DoStreamingRoundTrip failed: %v", err) 416 } 417} 418 419func runBenchmark(caller rpcCallFunc, start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats, mode string) { 420 // Warm up connection. 421 for i := 0; i < warmupCallCount; i++ { 422 caller(0) 423 } 424 425 // Run benchmark. 426 start(mode, bf) 427 var wg sync.WaitGroup 428 wg.Add(bf.MaxConcurrentCalls) 429 bmEnd := time.Now().Add(bf.BenchTime) 430 var count uint64 431 for i := 0; i < bf.MaxConcurrentCalls; i++ { 432 go func(pos int) { 433 defer wg.Done() 434 for { 435 t := time.Now() 436 if t.After(bmEnd) { 437 return 438 } 439 start := time.Now() 440 caller(pos) 441 elapse := time.Since(start) 442 atomic.AddUint64(&count, 1) 443 s.AddDuration(elapse) 444 } 445 }(i) 446 } 447 wg.Wait() 448 stop(count) 449} 450 451// benchOpts represents all configurable options available while running this 452// benchmark. This is built from the values passed as flags. 453type benchOpts struct { 454 rModes runModes 455 benchTime time.Duration 456 memProfileRate int 457 memProfile string 458 cpuProfile string 459 networkMode string 460 benchmarkResultFile string 461 useBufconn bool 462 enableKeepalive bool 463 features *featureOpts 464} 465 466// featureOpts represents options which can have multiple values. The user 467// usually provides a comma-separated list of options for each of these 468// features through command line flags. We generate all possible combinations 469// for the provided values and run the benchmarks for each combination. 470type featureOpts struct { 471 enableTrace []bool 472 readLatencies []time.Duration 473 readKbps []int 474 readMTU []int 475 maxConcurrentCalls []int 476 reqSizeBytes []int 477 respSizeBytes []int 478 compModes []string 479 enableChannelz []bool 480 enablePreloader []bool 481} 482 483// makeFeaturesNum returns a slice of ints of size 'maxFeatureIndex' where each 484// element of the slice (indexed by 'featuresIndex' enum) contains the number 485// of features to be exercised by the benchmark code. 486// For example: Index 0 of the returned slice contains the number of values for 487// enableTrace feature, while index 1 contains the number of value of 488// readLatencies feature and so on. 489func makeFeaturesNum(b *benchOpts) []int { 490 featuresNum := make([]int, stats.MaxFeatureIndex) 491 for i := 0; i < len(featuresNum); i++ { 492 switch stats.FeatureIndex(i) { 493 case stats.EnableTraceIndex: 494 featuresNum[i] = len(b.features.enableTrace) 495 case stats.ReadLatenciesIndex: 496 featuresNum[i] = len(b.features.readLatencies) 497 case stats.ReadKbpsIndex: 498 featuresNum[i] = len(b.features.readKbps) 499 case stats.ReadMTUIndex: 500 featuresNum[i] = len(b.features.readMTU) 501 case stats.MaxConcurrentCallsIndex: 502 featuresNum[i] = len(b.features.maxConcurrentCalls) 503 case stats.ReqSizeBytesIndex: 504 featuresNum[i] = len(b.features.reqSizeBytes) 505 case stats.RespSizeBytesIndex: 506 featuresNum[i] = len(b.features.respSizeBytes) 507 case stats.CompModesIndex: 508 featuresNum[i] = len(b.features.compModes) 509 case stats.EnableChannelzIndex: 510 featuresNum[i] = len(b.features.enableChannelz) 511 case stats.EnablePreloaderIndex: 512 featuresNum[i] = len(b.features.enablePreloader) 513 default: 514 log.Fatalf("Unknown feature index %v in generateFeatures. maxFeatureIndex is %v", i, stats.MaxFeatureIndex) 515 } 516 } 517 return featuresNum 518} 519 520// sharedFeatures returns a bool slice which acts as a bitmask. Each item in 521// the slice represents a feature, indexed by 'featureIndex' enum. The bit is 522// set to 1 if the corresponding feature does not have multiple value, so is 523// shared amongst all benchmarks. 524func sharedFeatures(featuresNum []int) []bool { 525 result := make([]bool, len(featuresNum)) 526 for i, num := range featuresNum { 527 if num <= 1 { 528 result[i] = true 529 } 530 } 531 return result 532} 533 534// generateFeatures generates all combinations of the provided feature options. 535// While all the feature options are stored in the benchOpts struct, the input 536// parameter 'featuresNum' is a slice indexed by 'featureIndex' enum containing 537// the number of values for each feature. 538// For example, let's say the user sets -workloads=all and 539// -maxConcurrentCalls=1,100, this would end up with the following 540// combinations: 541// [workloads: unary, maxConcurrentCalls=1] 542// [workloads: unary, maxConcurrentCalls=1] 543// [workloads: streaming, maxConcurrentCalls=100] 544// [workloads: streaming, maxConcurrentCalls=100] 545// [workloads: unconstrained, maxConcurrentCalls=1] 546// [workloads: unconstrained, maxConcurrentCalls=100] 547func (b *benchOpts) generateFeatures(featuresNum []int) []stats.Features { 548 // curPos and initialPos are two slices where each value acts as an index 549 // into the appropriate feature slice maintained in benchOpts.features. This 550 // loop generates all possible combinations of features by changing one value 551 // at a time, and once curPos becomes equal to initialPos, we have explored 552 // all options. 553 var result []stats.Features 554 var curPos []int 555 initialPos := make([]int, stats.MaxFeatureIndex) 556 for !reflect.DeepEqual(initialPos, curPos) { 557 if curPos == nil { 558 curPos = make([]int, stats.MaxFeatureIndex) 559 } 560 result = append(result, stats.Features{ 561 // These features stay the same for each iteration. 562 NetworkMode: b.networkMode, 563 UseBufConn: b.useBufconn, 564 EnableKeepalive: b.enableKeepalive, 565 BenchTime: b.benchTime, 566 // These features can potentially change for each iteration. 567 EnableTrace: b.features.enableTrace[curPos[stats.EnableTraceIndex]], 568 Latency: b.features.readLatencies[curPos[stats.ReadLatenciesIndex]], 569 Kbps: b.features.readKbps[curPos[stats.ReadKbpsIndex]], 570 MTU: b.features.readMTU[curPos[stats.ReadMTUIndex]], 571 MaxConcurrentCalls: b.features.maxConcurrentCalls[curPos[stats.MaxConcurrentCallsIndex]], 572 ReqSizeBytes: b.features.reqSizeBytes[curPos[stats.ReqSizeBytesIndex]], 573 RespSizeBytes: b.features.respSizeBytes[curPos[stats.RespSizeBytesIndex]], 574 ModeCompressor: b.features.compModes[curPos[stats.CompModesIndex]], 575 EnableChannelz: b.features.enableChannelz[curPos[stats.EnableChannelzIndex]], 576 EnablePreloader: b.features.enablePreloader[curPos[stats.EnablePreloaderIndex]], 577 }) 578 addOne(curPos, featuresNum) 579 } 580 return result 581} 582 583// addOne mutates the input slice 'features' by changing one feature, thus 584// arriving at the next combination of feature values. 'featuresMaxPosition' 585// provides the numbers of allowed values for each feature, indexed by 586// 'featureIndex' enum. 587func addOne(features []int, featuresMaxPosition []int) { 588 for i := len(features) - 1; i >= 0; i-- { 589 features[i] = (features[i] + 1) 590 if features[i]/featuresMaxPosition[i] == 0 { 591 break 592 } 593 features[i] = features[i] % featuresMaxPosition[i] 594 } 595} 596 597// processFlags reads the command line flags and builds benchOpts. Specifying 598// invalid values for certain flags will cause flag.Parse() to fail, and the 599// program to terminate. 600// This *SHOULD* be the only place where the flags are accessed. All other 601// parts of the benchmark code should rely on the returned benchOpts. 602func processFlags() *benchOpts { 603 flag.Parse() 604 if flag.NArg() != 0 { 605 log.Fatal("Error: unparsed arguments: ", flag.Args()) 606 } 607 608 opts := &benchOpts{ 609 rModes: runModesFromWorkloads(*workloads), 610 benchTime: *benchTime, 611 memProfileRate: *memProfileRate, 612 memProfile: *memProfile, 613 cpuProfile: *cpuProfile, 614 networkMode: *networkMode, 615 benchmarkResultFile: *benchmarkResultFile, 616 useBufconn: *useBufconn, 617 enableKeepalive: *enableKeepalive, 618 features: &featureOpts{ 619 enableTrace: setToggleMode(*traceMode), 620 readLatencies: append([]time.Duration(nil), *readLatency...), 621 readKbps: append([]int(nil), *readKbps...), 622 readMTU: append([]int(nil), *readMTU...), 623 maxConcurrentCalls: append([]int(nil), *maxConcurrentCalls...), 624 reqSizeBytes: append([]int(nil), *readReqSizeBytes...), 625 respSizeBytes: append([]int(nil), *readRespSizeBytes...), 626 compModes: setCompressorMode(*compressorMode), 627 enableChannelz: setToggleMode(*channelzOn), 628 enablePreloader: setToggleMode(*preloaderMode), 629 }, 630 } 631 632 // Re-write latency, kpbs and mtu if network mode is set. 633 if network, ok := networks[opts.networkMode]; ok { 634 opts.features.readLatencies = []time.Duration{network.Latency} 635 opts.features.readKbps = []int{network.Kbps} 636 opts.features.readMTU = []int{network.MTU} 637 } 638 return opts 639} 640 641func setToggleMode(val string) []bool { 642 switch val { 643 case toggleModeOn: 644 return []bool{true} 645 case toggleModeOff: 646 return []bool{false} 647 case toggleModeBoth: 648 return []bool{false, true} 649 default: 650 // This should never happen because a wrong value passed to this flag would 651 // be caught during flag.Parse(). 652 return []bool{} 653 } 654} 655 656func setCompressorMode(val string) []string { 657 switch val { 658 case compModeNop, compModeGzip, compModeOff: 659 return []string{val} 660 case compModeAll: 661 return []string{compModeNop, compModeGzip, compModeOff} 662 default: 663 // This should never happen because a wrong value passed to this flag would 664 // be caught during flag.Parse(). 665 return []string{} 666 } 667} 668 669func main() { 670 opts := processFlags() 671 before(opts) 672 673 s := stats.NewStats(numStatsBuckets) 674 featuresNum := makeFeaturesNum(opts) 675 sf := sharedFeatures(featuresNum) 676 677 var ( 678 start = func(mode string, bf stats.Features) { s.StartRun(mode, bf, sf) } 679 stop = func(count uint64) { s.EndRun(count) } 680 ucStop = func(req uint64, resp uint64) { s.EndUnconstrainedRun(req, resp) } 681 ) 682 683 for _, bf := range opts.generateFeatures(featuresNum) { 684 grpc.EnableTracing = bf.EnableTrace 685 if bf.EnableChannelz { 686 channelz.TurnOn() 687 } 688 if opts.rModes.unary { 689 unaryBenchmark(start, stop, bf, s) 690 } 691 if opts.rModes.streaming { 692 streamBenchmark(start, stop, bf, s) 693 } 694 if opts.rModes.unconstrained { 695 unconstrainedStreamBenchmark(start, ucStop, bf, s) 696 } 697 } 698 after(opts, s.GetResults()) 699} 700 701func before(opts *benchOpts) { 702 if opts.memProfile != "" { 703 runtime.MemProfileRate = opts.memProfileRate 704 } 705 if opts.cpuProfile != "" { 706 f, err := os.Create(opts.cpuProfile) 707 if err != nil { 708 fmt.Fprintf(os.Stderr, "testing: %s\n", err) 709 return 710 } 711 if err := pprof.StartCPUProfile(f); err != nil { 712 fmt.Fprintf(os.Stderr, "testing: can't start cpu profile: %s\n", err) 713 f.Close() 714 return 715 } 716 } 717} 718 719func after(opts *benchOpts, data []stats.BenchResults) { 720 if opts.cpuProfile != "" { 721 pprof.StopCPUProfile() // flushes profile to disk 722 } 723 if opts.memProfile != "" { 724 f, err := os.Create(opts.memProfile) 725 if err != nil { 726 fmt.Fprintf(os.Stderr, "testing: %s\n", err) 727 os.Exit(2) 728 } 729 runtime.GC() // materialize all statistics 730 if err = pprof.WriteHeapProfile(f); err != nil { 731 fmt.Fprintf(os.Stderr, "testing: can't write heap profile %s: %s\n", opts.memProfile, err) 732 os.Exit(2) 733 } 734 f.Close() 735 } 736 if opts.benchmarkResultFile != "" { 737 f, err := os.Create(opts.benchmarkResultFile) 738 if err != nil { 739 log.Fatalf("testing: can't write benchmark result %s: %s\n", opts.benchmarkResultFile, err) 740 } 741 dataEncoder := gob.NewEncoder(f) 742 dataEncoder.Encode(data) 743 f.Close() 744 } 745} 746 747// nopCompressor is a compressor that just copies data. 748type nopCompressor struct{} 749 750func (nopCompressor) Do(w io.Writer, p []byte) error { 751 n, err := w.Write(p) 752 if err != nil { 753 return err 754 } 755 if n != len(p) { 756 return fmt.Errorf("nopCompressor.Write: wrote %v bytes; want %v", n, len(p)) 757 } 758 return nil 759} 760 761func (nopCompressor) Type() string { return compModeNop } 762 763// nopDecompressor is a decompressor that just copies data. 764type nopDecompressor struct{} 765 766func (nopDecompressor) Do(r io.Reader) ([]byte, error) { return ioutil.ReadAll(r) } 767func (nopDecompressor) Type() string { return compModeNop } 768