1/* 2 * 3 * Copyright 2017 gRPC authors. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 * 17 */ 18 19/* 20Package main provides benchmark with setting flags. 21 22An example to run some benchmarks with profiling enabled: 23 24go run benchmark/benchmain/main.go -benchtime=10s -workloads=all \ 25 -compression=gzip -maxConcurrentCalls=1 -trace=off \ 26 -reqSizeBytes=1,1048576 -respSizeBytes=1,1048576 -networkMode=Local \ 27 -cpuProfile=cpuProf -memProfile=memProf -memProfileRate=10000 -resultFile=result 28 29As a suggestion, when creating a branch, you can run this benchmark and save the result 30file "-resultFile=basePerf", and later when you at the middle of the work or finish the 31work, you can get the benchmark result and compare it with the base anytime. 32 33Assume there are two result files names as "basePerf" and "curPerf" created by adding 34-resultFile=basePerf and -resultFile=curPerf. 35 To format the curPerf, run: 36 go run benchmark/benchresult/main.go curPerf 37 To observe how the performance changes based on a base result, run: 38 go run benchmark/benchresult/main.go basePerf curPerf 39*/ 40package main 41 42import ( 43 "context" 44 "encoding/gob" 45 "flag" 46 "fmt" 47 "io" 48 "io/ioutil" 49 "log" 50 "net" 51 "os" 52 "reflect" 53 "runtime" 54 "runtime/pprof" 55 "strings" 56 "sync" 57 "sync/atomic" 58 "time" 59 60 "google.golang.org/grpc" 61 bm "google.golang.org/grpc/benchmark" 62 "google.golang.org/grpc/benchmark/flags" 63 testpb "google.golang.org/grpc/benchmark/grpc_testing" 64 "google.golang.org/grpc/benchmark/latency" 65 "google.golang.org/grpc/benchmark/stats" 66 "google.golang.org/grpc/grpclog" 67 "google.golang.org/grpc/internal/channelz" 68 "google.golang.org/grpc/keepalive" 69 "google.golang.org/grpc/test/bufconn" 70) 71 72var ( 73 workloads = flags.StringWithAllowedValues("workloads", workloadsAll, 74 fmt.Sprintf("Workloads to execute - One of: %v", strings.Join(allWorkloads, ", ")), allWorkloads) 75 traceMode = flags.StringWithAllowedValues("trace", toggleModeOff, 76 fmt.Sprintf("Trace mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes) 77 preloaderMode = flags.StringWithAllowedValues("preloader", toggleModeOff, 78 fmt.Sprintf("Preloader mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes) 79 channelzOn = flags.StringWithAllowedValues("channelz", toggleModeOff, 80 fmt.Sprintf("Channelz mode - One of: %v", strings.Join(allToggleModes, ", ")), allToggleModes) 81 compressorMode = flags.StringWithAllowedValues("compression", compModeOff, 82 fmt.Sprintf("Compression mode - One of: %v", strings.Join(allCompModes, ", ")), allCompModes) 83 networkMode = flags.StringWithAllowedValues("networkMode", networkModeNone, 84 "Network mode includes LAN, WAN, Local and Longhaul", allNetworkModes) 85 readLatency = flags.DurationSlice("latency", defaultReadLatency, "Simulated one-way network latency - may be a comma-separated list") 86 readKbps = flags.IntSlice("kbps", defaultReadKbps, "Simulated network throughput (in kbps) - may be a comma-separated list") 87 readMTU = flags.IntSlice("mtu", defaultReadMTU, "Simulated network MTU (Maximum Transmission Unit) - may be a comma-separated list") 88 maxConcurrentCalls = flags.IntSlice("maxConcurrentCalls", defaultMaxConcurrentCalls, "Number of concurrent RPCs during benchmarks") 89 readReqSizeBytes = flags.IntSlice("reqSizeBytes", nil, "Request size in bytes - may be a comma-separated list") 90 readRespSizeBytes = flags.IntSlice("respSizeBytes", nil, "Response size in bytes - may be a comma-separated list") 91 reqPayloadCurveFiles = flags.StringSlice("reqPayloadCurveFiles", nil, "comma-separated list of CSV files describing the shape a random distribution of request payload sizes") 92 respPayloadCurveFiles = flags.StringSlice("respPayloadCurveFiles", nil, "comma-separated list of CSV files describing the shape a random distribution of response payload sizes") 93 benchTime = flag.Duration("benchtime", time.Second, "Configures the amount of time to run each benchmark") 94 memProfile = flag.String("memProfile", "", "Enables memory profiling output to the filename provided.") 95 memProfileRate = flag.Int("memProfileRate", 512*1024, "Configures the memory profiling rate. \n"+ 96 "memProfile should be set before setting profile rate. To include every allocated block in the profile, "+ 97 "set MemProfileRate to 1. To turn off profiling entirely, set MemProfileRate to 0. 512 * 1024 by default.") 98 cpuProfile = flag.String("cpuProfile", "", "Enables CPU profiling output to the filename provided") 99 benchmarkResultFile = flag.String("resultFile", "", "Save the benchmark result into a binary file") 100 useBufconn = flag.Bool("bufconn", false, "Use in-memory connection instead of system network I/O") 101 enableKeepalive = flag.Bool("enable_keepalive", false, "Enable client keepalive. \n"+ 102 "Keepalive.Time is set to 10s, Keepalive.Timeout is set to 1s, Keepalive.PermitWithoutStream is set to true.") 103) 104 105const ( 106 workloadsUnary = "unary" 107 workloadsStreaming = "streaming" 108 workloadsUnconstrained = "unconstrained" 109 workloadsAll = "all" 110 // Compression modes. 111 compModeOff = "off" 112 compModeGzip = "gzip" 113 compModeNop = "nop" 114 compModeAll = "all" 115 // Toggle modes. 116 toggleModeOff = "off" 117 toggleModeOn = "on" 118 toggleModeBoth = "both" 119 // Network modes. 120 networkModeNone = "none" 121 networkModeLocal = "Local" 122 networkModeLAN = "LAN" 123 networkModeWAN = "WAN" 124 networkLongHaul = "Longhaul" 125 126 numStatsBuckets = 10 127 warmupCallCount = 10 128 warmuptime = time.Second 129) 130 131var ( 132 allWorkloads = []string{workloadsUnary, workloadsStreaming, workloadsUnconstrained, workloadsAll} 133 allCompModes = []string{compModeOff, compModeGzip, compModeNop, compModeAll} 134 allToggleModes = []string{toggleModeOff, toggleModeOn, toggleModeBoth} 135 allNetworkModes = []string{networkModeNone, networkModeLocal, networkModeLAN, networkModeWAN, networkLongHaul} 136 defaultReadLatency = []time.Duration{0, 40 * time.Millisecond} // if non-positive, no delay. 137 defaultReadKbps = []int{0, 10240} // if non-positive, infinite 138 defaultReadMTU = []int{0} // if non-positive, infinite 139 defaultMaxConcurrentCalls = []int{1, 8, 64, 512} 140 defaultReqSizeBytes = []int{1, 1024, 1024 * 1024} 141 defaultRespSizeBytes = []int{1, 1024, 1024 * 1024} 142 networks = map[string]latency.Network{ 143 networkModeLocal: latency.Local, 144 networkModeLAN: latency.LAN, 145 networkModeWAN: latency.WAN, 146 networkLongHaul: latency.Longhaul, 147 } 148 keepaliveTime = 10 * time.Second 149 keepaliveTimeout = 1 * time.Second 150 // This is 0.8*keepaliveTime to prevent connection issues because of server 151 // keepalive enforcement. 152 keepaliveMinTime = 8 * time.Second 153) 154 155// runModes indicates the workloads to run. This is initialized with a call to 156// `runModesFromWorkloads`, passing the workloads flag set by the user. 157type runModes struct { 158 unary, streaming, unconstrained bool 159} 160 161// runModesFromWorkloads determines the runModes based on the value of 162// workloads flag set by the user. 163func runModesFromWorkloads(workload string) runModes { 164 r := runModes{} 165 switch workload { 166 case workloadsUnary: 167 r.unary = true 168 case workloadsStreaming: 169 r.streaming = true 170 case workloadsUnconstrained: 171 r.unconstrained = true 172 case workloadsAll: 173 r.unary = true 174 r.streaming = true 175 r.unconstrained = true 176 default: 177 log.Fatalf("Unknown workloads setting: %v (want one of: %v)", 178 workloads, strings.Join(allWorkloads, ", ")) 179 } 180 return r 181} 182 183type startFunc func(mode string, bf stats.Features) 184type stopFunc func(count uint64) 185type ucStopFunc func(req uint64, resp uint64) 186type rpcCallFunc func(pos int) 187type rpcSendFunc func(pos int) 188type rpcRecvFunc func(pos int) 189type rpcCleanupFunc func() 190 191func unaryBenchmark(start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats) { 192 caller, cleanup := makeFuncUnary(bf) 193 defer cleanup() 194 runBenchmark(caller, start, stop, bf, s, workloadsUnary) 195} 196 197func streamBenchmark(start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats) { 198 caller, cleanup := makeFuncStream(bf) 199 defer cleanup() 200 runBenchmark(caller, start, stop, bf, s, workloadsStreaming) 201} 202 203func unconstrainedStreamBenchmark(start startFunc, stop ucStopFunc, bf stats.Features, s *stats.Stats) { 204 var sender rpcSendFunc 205 var recver rpcRecvFunc 206 var cleanup rpcCleanupFunc 207 if bf.EnablePreloader { 208 sender, recver, cleanup = makeFuncUnconstrainedStreamPreloaded(bf) 209 } else { 210 sender, recver, cleanup = makeFuncUnconstrainedStream(bf) 211 } 212 defer cleanup() 213 214 var req, resp uint64 215 go func() { 216 // Resets the counters once warmed up 217 <-time.NewTimer(warmuptime).C 218 atomic.StoreUint64(&req, 0) 219 atomic.StoreUint64(&resp, 0) 220 start(workloadsUnconstrained, bf) 221 }() 222 223 bmEnd := time.Now().Add(bf.BenchTime + warmuptime) 224 var wg sync.WaitGroup 225 wg.Add(2 * bf.MaxConcurrentCalls) 226 for i := 0; i < bf.MaxConcurrentCalls; i++ { 227 go func(pos int) { 228 defer wg.Done() 229 for { 230 t := time.Now() 231 if t.After(bmEnd) { 232 return 233 } 234 sender(pos) 235 atomic.AddUint64(&req, 1) 236 } 237 }(i) 238 go func(pos int) { 239 defer wg.Done() 240 for { 241 t := time.Now() 242 if t.After(bmEnd) { 243 return 244 } 245 recver(pos) 246 atomic.AddUint64(&resp, 1) 247 } 248 }(i) 249 } 250 wg.Wait() 251 stop(req, resp) 252} 253 254// makeClient returns a gRPC client for the grpc.testing.BenchmarkService 255// service. The client is configured using the different options in the passed 256// 'bf'. Also returns a cleanup function to close the client and release 257// resources. 258func makeClient(bf stats.Features) (testpb.BenchmarkServiceClient, func()) { 259 nw := &latency.Network{Kbps: bf.Kbps, Latency: bf.Latency, MTU: bf.MTU} 260 opts := []grpc.DialOption{} 261 sopts := []grpc.ServerOption{} 262 if bf.ModeCompressor == compModeNop { 263 sopts = append(sopts, 264 grpc.RPCCompressor(nopCompressor{}), 265 grpc.RPCDecompressor(nopDecompressor{}), 266 ) 267 opts = append(opts, 268 grpc.WithCompressor(nopCompressor{}), 269 grpc.WithDecompressor(nopDecompressor{}), 270 ) 271 } 272 if bf.ModeCompressor == compModeGzip { 273 sopts = append(sopts, 274 grpc.RPCCompressor(grpc.NewGZIPCompressor()), 275 grpc.RPCDecompressor(grpc.NewGZIPDecompressor()), 276 ) 277 opts = append(opts, 278 grpc.WithCompressor(grpc.NewGZIPCompressor()), 279 grpc.WithDecompressor(grpc.NewGZIPDecompressor()), 280 ) 281 } 282 if bf.EnableKeepalive { 283 sopts = append(sopts, 284 grpc.KeepaliveParams(keepalive.ServerParameters{ 285 Time: keepaliveTime, 286 Timeout: keepaliveTimeout, 287 }), 288 grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ 289 MinTime: keepaliveMinTime, 290 PermitWithoutStream: true, 291 }), 292 ) 293 opts = append(opts, 294 grpc.WithKeepaliveParams(keepalive.ClientParameters{ 295 Time: keepaliveTime, 296 Timeout: keepaliveTimeout, 297 PermitWithoutStream: true, 298 }), 299 ) 300 } 301 sopts = append(sopts, grpc.MaxConcurrentStreams(uint32(bf.MaxConcurrentCalls+1))) 302 opts = append(opts, grpc.WithInsecure()) 303 304 var lis net.Listener 305 if bf.UseBufConn { 306 bcLis := bufconn.Listen(256 * 1024) 307 lis = bcLis 308 opts = append(opts, grpc.WithContextDialer(func(ctx context.Context, address string) (net.Conn, error) { 309 return nw.ContextDialer(func(context.Context, string, string) (net.Conn, error) { 310 return bcLis.Dial() 311 })(ctx, "", "") 312 })) 313 } else { 314 var err error 315 lis, err = net.Listen("tcp", "localhost:0") 316 if err != nil { 317 grpclog.Fatalf("Failed to listen: %v", err) 318 } 319 opts = append(opts, grpc.WithContextDialer(func(ctx context.Context, address string) (net.Conn, error) { 320 return nw.ContextDialer((&net.Dialer{}).DialContext)(ctx, "tcp", lis.Addr().String()) 321 })) 322 } 323 lis = nw.Listener(lis) 324 stopper := bm.StartServer(bm.ServerInfo{Type: "protobuf", Listener: lis}, sopts...) 325 conn := bm.NewClientConn("" /* target not used */, opts...) 326 return testpb.NewBenchmarkServiceClient(conn), func() { 327 conn.Close() 328 stopper() 329 } 330} 331 332func makeFuncUnary(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) { 333 tc, cleanup := makeClient(bf) 334 return func(int) { 335 reqSizeBytes := bf.ReqSizeBytes 336 respSizeBytes := bf.RespSizeBytes 337 if bf.ReqPayloadCurve != nil { 338 reqSizeBytes = bf.ReqPayloadCurve.ChooseRandom() 339 } 340 if bf.RespPayloadCurve != nil { 341 respSizeBytes = bf.RespPayloadCurve.ChooseRandom() 342 } 343 unaryCaller(tc, reqSizeBytes, respSizeBytes) 344 }, cleanup 345} 346 347func makeFuncStream(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) { 348 tc, cleanup := makeClient(bf) 349 350 streams := make([]testpb.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls) 351 for i := 0; i < bf.MaxConcurrentCalls; i++ { 352 stream, err := tc.StreamingCall(context.Background()) 353 if err != nil { 354 grpclog.Fatalf("%v.StreamingCall(_) = _, %v", tc, err) 355 } 356 streams[i] = stream 357 } 358 359 return func(pos int) { 360 reqSizeBytes := bf.ReqSizeBytes 361 respSizeBytes := bf.RespSizeBytes 362 if bf.ReqPayloadCurve != nil { 363 reqSizeBytes = bf.ReqPayloadCurve.ChooseRandom() 364 } 365 if bf.RespPayloadCurve != nil { 366 respSizeBytes = bf.RespPayloadCurve.ChooseRandom() 367 } 368 streamCaller(streams[pos], reqSizeBytes, respSizeBytes) 369 }, cleanup 370} 371 372func makeFuncUnconstrainedStreamPreloaded(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) { 373 streams, req, cleanup := setupUnconstrainedStream(bf) 374 375 preparedMsg := make([]*grpc.PreparedMsg, len(streams)) 376 for i, stream := range streams { 377 preparedMsg[i] = &grpc.PreparedMsg{} 378 err := preparedMsg[i].Encode(stream, req) 379 if err != nil { 380 grpclog.Fatalf("%v.Encode(%v, %v) = %v", preparedMsg[i], req, stream, err) 381 } 382 } 383 384 return func(pos int) { 385 streams[pos].SendMsg(preparedMsg[pos]) 386 }, func(pos int) { 387 streams[pos].Recv() 388 }, cleanup 389} 390 391func makeFuncUnconstrainedStream(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) { 392 streams, req, cleanup := setupUnconstrainedStream(bf) 393 394 return func(pos int) { 395 streams[pos].Send(req) 396 }, func(pos int) { 397 streams[pos].Recv() 398 }, cleanup 399} 400 401func setupUnconstrainedStream(bf stats.Features) ([]testpb.BenchmarkService_StreamingCallClient, *testpb.SimpleRequest, rpcCleanupFunc) { 402 tc, cleanup := makeClient(bf) 403 404 streams := make([]testpb.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls) 405 for i := 0; i < bf.MaxConcurrentCalls; i++ { 406 stream, err := tc.UnconstrainedStreamingCall(context.Background()) 407 if err != nil { 408 grpclog.Fatalf("%v.UnconstrainedStreamingCall(_) = _, %v", tc, err) 409 } 410 streams[i] = stream 411 } 412 413 pl := bm.NewPayload(testpb.PayloadType_COMPRESSABLE, bf.ReqSizeBytes) 414 req := &testpb.SimpleRequest{ 415 ResponseType: pl.Type, 416 ResponseSize: int32(bf.RespSizeBytes), 417 Payload: pl, 418 } 419 420 return streams, req, cleanup 421} 422 423// Makes a UnaryCall gRPC request using the given BenchmarkServiceClient and 424// request and response sizes. 425func unaryCaller(client testpb.BenchmarkServiceClient, reqSize, respSize int) { 426 if err := bm.DoUnaryCall(client, reqSize, respSize); err != nil { 427 grpclog.Fatalf("DoUnaryCall failed: %v", err) 428 } 429} 430 431func streamCaller(stream testpb.BenchmarkService_StreamingCallClient, reqSize, respSize int) { 432 if err := bm.DoStreamingRoundTrip(stream, reqSize, respSize); err != nil { 433 grpclog.Fatalf("DoStreamingRoundTrip failed: %v", err) 434 } 435} 436 437func runBenchmark(caller rpcCallFunc, start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats, mode string) { 438 // Warm up connection. 439 for i := 0; i < warmupCallCount; i++ { 440 caller(0) 441 } 442 443 // Run benchmark. 444 start(mode, bf) 445 var wg sync.WaitGroup 446 wg.Add(bf.MaxConcurrentCalls) 447 bmEnd := time.Now().Add(bf.BenchTime) 448 var count uint64 449 for i := 0; i < bf.MaxConcurrentCalls; i++ { 450 go func(pos int) { 451 defer wg.Done() 452 for { 453 t := time.Now() 454 if t.After(bmEnd) { 455 return 456 } 457 start := time.Now() 458 caller(pos) 459 elapse := time.Since(start) 460 atomic.AddUint64(&count, 1) 461 s.AddDuration(elapse) 462 } 463 }(i) 464 } 465 wg.Wait() 466 stop(count) 467} 468 469// benchOpts represents all configurable options available while running this 470// benchmark. This is built from the values passed as flags. 471type benchOpts struct { 472 rModes runModes 473 benchTime time.Duration 474 memProfileRate int 475 memProfile string 476 cpuProfile string 477 networkMode string 478 benchmarkResultFile string 479 useBufconn bool 480 enableKeepalive bool 481 features *featureOpts 482} 483 484// featureOpts represents options which can have multiple values. The user 485// usually provides a comma-separated list of options for each of these 486// features through command line flags. We generate all possible combinations 487// for the provided values and run the benchmarks for each combination. 488type featureOpts struct { 489 enableTrace []bool 490 readLatencies []time.Duration 491 readKbps []int 492 readMTU []int 493 maxConcurrentCalls []int 494 reqSizeBytes []int 495 respSizeBytes []int 496 reqPayloadCurves []*stats.PayloadCurve 497 respPayloadCurves []*stats.PayloadCurve 498 compModes []string 499 enableChannelz []bool 500 enablePreloader []bool 501} 502 503// makeFeaturesNum returns a slice of ints of size 'maxFeatureIndex' where each 504// element of the slice (indexed by 'featuresIndex' enum) contains the number 505// of features to be exercised by the benchmark code. 506// For example: Index 0 of the returned slice contains the number of values for 507// enableTrace feature, while index 1 contains the number of value of 508// readLatencies feature and so on. 509func makeFeaturesNum(b *benchOpts) []int { 510 featuresNum := make([]int, stats.MaxFeatureIndex) 511 for i := 0; i < len(featuresNum); i++ { 512 switch stats.FeatureIndex(i) { 513 case stats.EnableTraceIndex: 514 featuresNum[i] = len(b.features.enableTrace) 515 case stats.ReadLatenciesIndex: 516 featuresNum[i] = len(b.features.readLatencies) 517 case stats.ReadKbpsIndex: 518 featuresNum[i] = len(b.features.readKbps) 519 case stats.ReadMTUIndex: 520 featuresNum[i] = len(b.features.readMTU) 521 case stats.MaxConcurrentCallsIndex: 522 featuresNum[i] = len(b.features.maxConcurrentCalls) 523 case stats.ReqSizeBytesIndex: 524 featuresNum[i] = len(b.features.reqSizeBytes) 525 case stats.RespSizeBytesIndex: 526 featuresNum[i] = len(b.features.respSizeBytes) 527 case stats.ReqPayloadCurveIndex: 528 featuresNum[i] = len(b.features.reqPayloadCurves) 529 case stats.RespPayloadCurveIndex: 530 featuresNum[i] = len(b.features.respPayloadCurves) 531 case stats.CompModesIndex: 532 featuresNum[i] = len(b.features.compModes) 533 case stats.EnableChannelzIndex: 534 featuresNum[i] = len(b.features.enableChannelz) 535 case stats.EnablePreloaderIndex: 536 featuresNum[i] = len(b.features.enablePreloader) 537 default: 538 log.Fatalf("Unknown feature index %v in generateFeatures. maxFeatureIndex is %v", i, stats.MaxFeatureIndex) 539 } 540 } 541 return featuresNum 542} 543 544// sharedFeatures returns a bool slice which acts as a bitmask. Each item in 545// the slice represents a feature, indexed by 'featureIndex' enum. The bit is 546// set to 1 if the corresponding feature does not have multiple value, so is 547// shared amongst all benchmarks. 548func sharedFeatures(featuresNum []int) []bool { 549 result := make([]bool, len(featuresNum)) 550 for i, num := range featuresNum { 551 if num <= 1 { 552 result[i] = true 553 } 554 } 555 return result 556} 557 558// generateFeatures generates all combinations of the provided feature options. 559// While all the feature options are stored in the benchOpts struct, the input 560// parameter 'featuresNum' is a slice indexed by 'featureIndex' enum containing 561// the number of values for each feature. 562// For example, let's say the user sets -workloads=all and 563// -maxConcurrentCalls=1,100, this would end up with the following 564// combinations: 565// [workloads: unary, maxConcurrentCalls=1] 566// [workloads: unary, maxConcurrentCalls=1] 567// [workloads: streaming, maxConcurrentCalls=100] 568// [workloads: streaming, maxConcurrentCalls=100] 569// [workloads: unconstrained, maxConcurrentCalls=1] 570// [workloads: unconstrained, maxConcurrentCalls=100] 571func (b *benchOpts) generateFeatures(featuresNum []int) []stats.Features { 572 // curPos and initialPos are two slices where each value acts as an index 573 // into the appropriate feature slice maintained in benchOpts.features. This 574 // loop generates all possible combinations of features by changing one value 575 // at a time, and once curPos becomes equal to initialPos, we have explored 576 // all options. 577 var result []stats.Features 578 var curPos []int 579 initialPos := make([]int, stats.MaxFeatureIndex) 580 for !reflect.DeepEqual(initialPos, curPos) { 581 if curPos == nil { 582 curPos = make([]int, stats.MaxFeatureIndex) 583 } 584 f := stats.Features{ 585 // These features stay the same for each iteration. 586 NetworkMode: b.networkMode, 587 UseBufConn: b.useBufconn, 588 EnableKeepalive: b.enableKeepalive, 589 BenchTime: b.benchTime, 590 // These features can potentially change for each iteration. 591 EnableTrace: b.features.enableTrace[curPos[stats.EnableTraceIndex]], 592 Latency: b.features.readLatencies[curPos[stats.ReadLatenciesIndex]], 593 Kbps: b.features.readKbps[curPos[stats.ReadKbpsIndex]], 594 MTU: b.features.readMTU[curPos[stats.ReadMTUIndex]], 595 MaxConcurrentCalls: b.features.maxConcurrentCalls[curPos[stats.MaxConcurrentCallsIndex]], 596 ModeCompressor: b.features.compModes[curPos[stats.CompModesIndex]], 597 EnableChannelz: b.features.enableChannelz[curPos[stats.EnableChannelzIndex]], 598 EnablePreloader: b.features.enablePreloader[curPos[stats.EnablePreloaderIndex]], 599 } 600 if len(b.features.reqPayloadCurves) == 0 { 601 f.ReqSizeBytes = b.features.reqSizeBytes[curPos[stats.ReqSizeBytesIndex]] 602 } else { 603 f.ReqPayloadCurve = b.features.reqPayloadCurves[curPos[stats.ReqPayloadCurveIndex]] 604 } 605 if len(b.features.respPayloadCurves) == 0 { 606 f.RespSizeBytes = b.features.respSizeBytes[curPos[stats.RespSizeBytesIndex]] 607 } else { 608 f.RespPayloadCurve = b.features.respPayloadCurves[curPos[stats.RespPayloadCurveIndex]] 609 } 610 result = append(result, f) 611 addOne(curPos, featuresNum) 612 } 613 return result 614} 615 616// addOne mutates the input slice 'features' by changing one feature, thus 617// arriving at the next combination of feature values. 'featuresMaxPosition' 618// provides the numbers of allowed values for each feature, indexed by 619// 'featureIndex' enum. 620func addOne(features []int, featuresMaxPosition []int) { 621 for i := len(features) - 1; i >= 0; i-- { 622 if featuresMaxPosition[i] == 0 { 623 continue 624 } 625 features[i] = (features[i] + 1) 626 if features[i]/featuresMaxPosition[i] == 0 { 627 break 628 } 629 features[i] = features[i] % featuresMaxPosition[i] 630 } 631} 632 633// processFlags reads the command line flags and builds benchOpts. Specifying 634// invalid values for certain flags will cause flag.Parse() to fail, and the 635// program to terminate. 636// This *SHOULD* be the only place where the flags are accessed. All other 637// parts of the benchmark code should rely on the returned benchOpts. 638func processFlags() *benchOpts { 639 flag.Parse() 640 if flag.NArg() != 0 { 641 log.Fatal("Error: unparsed arguments: ", flag.Args()) 642 } 643 644 opts := &benchOpts{ 645 rModes: runModesFromWorkloads(*workloads), 646 benchTime: *benchTime, 647 memProfileRate: *memProfileRate, 648 memProfile: *memProfile, 649 cpuProfile: *cpuProfile, 650 networkMode: *networkMode, 651 benchmarkResultFile: *benchmarkResultFile, 652 useBufconn: *useBufconn, 653 enableKeepalive: *enableKeepalive, 654 features: &featureOpts{ 655 enableTrace: setToggleMode(*traceMode), 656 readLatencies: append([]time.Duration(nil), *readLatency...), 657 readKbps: append([]int(nil), *readKbps...), 658 readMTU: append([]int(nil), *readMTU...), 659 maxConcurrentCalls: append([]int(nil), *maxConcurrentCalls...), 660 reqSizeBytes: append([]int(nil), *readReqSizeBytes...), 661 respSizeBytes: append([]int(nil), *readRespSizeBytes...), 662 compModes: setCompressorMode(*compressorMode), 663 enableChannelz: setToggleMode(*channelzOn), 664 enablePreloader: setToggleMode(*preloaderMode), 665 }, 666 } 667 668 if len(*reqPayloadCurveFiles) == 0 { 669 if len(opts.features.reqSizeBytes) == 0 { 670 opts.features.reqSizeBytes = defaultReqSizeBytes 671 } 672 } else { 673 if len(opts.features.reqSizeBytes) != 0 { 674 log.Fatalf("you may not specify -reqPayloadCurveFiles and -reqSizeBytes at the same time") 675 } 676 for _, file := range *reqPayloadCurveFiles { 677 pc, err := stats.NewPayloadCurve(file) 678 if err != nil { 679 log.Fatalf("cannot load payload curve file %s: %v", file, err) 680 } 681 opts.features.reqPayloadCurves = append(opts.features.reqPayloadCurves, pc) 682 } 683 opts.features.reqSizeBytes = nil 684 } 685 if len(*respPayloadCurveFiles) == 0 { 686 if len(opts.features.respSizeBytes) == 0 { 687 opts.features.respSizeBytes = defaultRespSizeBytes 688 } 689 } else { 690 if len(opts.features.respSizeBytes) != 0 { 691 log.Fatalf("you may not specify -respPayloadCurveFiles and -respSizeBytes at the same time") 692 } 693 for _, file := range *respPayloadCurveFiles { 694 pc, err := stats.NewPayloadCurve(file) 695 if err != nil { 696 log.Fatalf("cannot load payload curve file %s: %v", file, err) 697 } 698 opts.features.respPayloadCurves = append(opts.features.respPayloadCurves, pc) 699 } 700 opts.features.respSizeBytes = nil 701 } 702 703 // Re-write latency, kpbs and mtu if network mode is set. 704 if network, ok := networks[opts.networkMode]; ok { 705 opts.features.readLatencies = []time.Duration{network.Latency} 706 opts.features.readKbps = []int{network.Kbps} 707 opts.features.readMTU = []int{network.MTU} 708 } 709 return opts 710} 711 712func setToggleMode(val string) []bool { 713 switch val { 714 case toggleModeOn: 715 return []bool{true} 716 case toggleModeOff: 717 return []bool{false} 718 case toggleModeBoth: 719 return []bool{false, true} 720 default: 721 // This should never happen because a wrong value passed to this flag would 722 // be caught during flag.Parse(). 723 return []bool{} 724 } 725} 726 727func setCompressorMode(val string) []string { 728 switch val { 729 case compModeNop, compModeGzip, compModeOff: 730 return []string{val} 731 case compModeAll: 732 return []string{compModeNop, compModeGzip, compModeOff} 733 default: 734 // This should never happen because a wrong value passed to this flag would 735 // be caught during flag.Parse(). 736 return []string{} 737 } 738} 739 740func main() { 741 opts := processFlags() 742 before(opts) 743 744 s := stats.NewStats(numStatsBuckets) 745 featuresNum := makeFeaturesNum(opts) 746 sf := sharedFeatures(featuresNum) 747 748 var ( 749 start = func(mode string, bf stats.Features) { s.StartRun(mode, bf, sf) } 750 stop = func(count uint64) { s.EndRun(count) } 751 ucStop = func(req uint64, resp uint64) { s.EndUnconstrainedRun(req, resp) } 752 ) 753 754 for _, bf := range opts.generateFeatures(featuresNum) { 755 grpc.EnableTracing = bf.EnableTrace 756 if bf.EnableChannelz { 757 channelz.TurnOn() 758 } 759 if opts.rModes.unary { 760 unaryBenchmark(start, stop, bf, s) 761 } 762 if opts.rModes.streaming { 763 streamBenchmark(start, stop, bf, s) 764 } 765 if opts.rModes.unconstrained { 766 unconstrainedStreamBenchmark(start, ucStop, bf, s) 767 } 768 } 769 after(opts, s.GetResults()) 770} 771 772func before(opts *benchOpts) { 773 if opts.memProfile != "" { 774 runtime.MemProfileRate = opts.memProfileRate 775 } 776 if opts.cpuProfile != "" { 777 f, err := os.Create(opts.cpuProfile) 778 if err != nil { 779 fmt.Fprintf(os.Stderr, "testing: %s\n", err) 780 return 781 } 782 if err := pprof.StartCPUProfile(f); err != nil { 783 fmt.Fprintf(os.Stderr, "testing: can't start cpu profile: %s\n", err) 784 f.Close() 785 return 786 } 787 } 788} 789 790func after(opts *benchOpts, data []stats.BenchResults) { 791 if opts.cpuProfile != "" { 792 pprof.StopCPUProfile() // flushes profile to disk 793 } 794 if opts.memProfile != "" { 795 f, err := os.Create(opts.memProfile) 796 if err != nil { 797 fmt.Fprintf(os.Stderr, "testing: %s\n", err) 798 os.Exit(2) 799 } 800 runtime.GC() // materialize all statistics 801 if err = pprof.WriteHeapProfile(f); err != nil { 802 fmt.Fprintf(os.Stderr, "testing: can't write heap profile %s: %s\n", opts.memProfile, err) 803 os.Exit(2) 804 } 805 f.Close() 806 } 807 if opts.benchmarkResultFile != "" { 808 f, err := os.Create(opts.benchmarkResultFile) 809 if err != nil { 810 log.Fatalf("testing: can't write benchmark result %s: %s\n", opts.benchmarkResultFile, err) 811 } 812 dataEncoder := gob.NewEncoder(f) 813 dataEncoder.Encode(data) 814 f.Close() 815 } 816} 817 818// nopCompressor is a compressor that just copies data. 819type nopCompressor struct{} 820 821func (nopCompressor) Do(w io.Writer, p []byte) error { 822 n, err := w.Write(p) 823 if err != nil { 824 return err 825 } 826 if n != len(p) { 827 return fmt.Errorf("nopCompressor.Write: wrote %v bytes; want %v", n, len(p)) 828 } 829 return nil 830} 831 832func (nopCompressor) Type() string { return compModeNop } 833 834// nopDecompressor is a decompressor that just copies data. 835type nopDecompressor struct{} 836 837func (nopDecompressor) Do(r io.Reader) ([]byte, error) { return ioutil.ReadAll(r) } 838func (nopDecompressor) Type() string { return compModeNop } 839