1/* 2Copyright 2016 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17/* 18Scantest does scan-related load testing against Cloud Bigtable. The logic here 19mimics a similar test written using the Java client. 20*/ 21package main 22 23import ( 24 "bytes" 25 "context" 26 "flag" 27 "fmt" 28 "log" 29 "math/rand" 30 "os" 31 "sync" 32 "sync/atomic" 33 "text/tabwriter" 34 "time" 35 36 "cloud.google.com/go/bigtable" 37 "cloud.google.com/go/bigtable/internal/cbtconfig" 38 "cloud.google.com/go/bigtable/internal/stat" 39) 40 41var ( 42 runFor = flag.Duration("run_for", 5*time.Second, "how long to run the load test for") 43 numScans = flag.Int("concurrent_scans", 1, "number of concurrent scans") 44 rowLimit = flag.Int("row_limit", 10000, "max number of records per scan") 45 46 config *cbtconfig.Config 47 client *bigtable.Client 48) 49 50func main() { 51 flag.Usage = func() { 52 fmt.Printf("Usage: scantest [options] <table_name>\n\n") 53 flag.PrintDefaults() 54 } 55 56 var err error 57 config, err = cbtconfig.Load() 58 if err != nil { 59 log.Fatal(err) 60 } 61 config.RegisterFlags() 62 63 flag.Parse() 64 if err := config.CheckFlags(cbtconfig.ProjectAndInstanceRequired); err != nil { 65 log.Fatal(err) 66 } 67 if config.Creds != "" { 68 os.Setenv("GOOGLE_APPLICATION_CREDENTIALS", config.Creds) 69 } 70 if flag.NArg() != 1 { 71 flag.Usage() 72 os.Exit(1) 73 } 74 75 table := flag.Arg(0) 76 77 log.Printf("Dialing connections...") 78 client, err = bigtable.NewClient(context.Background(), config.Project, config.Instance) 79 if err != nil { 80 log.Fatalf("Making bigtable.Client: %v", err) 81 } 82 defer client.Close() 83 84 log.Printf("Starting scan test... (run for %v)", *runFor) 85 tbl := client.Open(table) 86 sem := make(chan int, *numScans) // limit the number of requests happening at once 87 var scans stats 88 89 stopTime := time.Now().Add(*runFor) 90 var wg sync.WaitGroup 91 for time.Now().Before(stopTime) { 92 sem <- 1 93 wg.Add(1) 94 go func() { 95 defer wg.Done() 96 defer func() { <-sem }() 97 98 ok := true 99 opStart := time.Now() 100 defer func() { 101 scans.Record(ok, time.Since(opStart)) 102 }() 103 104 // Start at a random row key 105 key := fmt.Sprintf("user%d", rand.Int63()) 106 limit := bigtable.LimitRows(int64(*rowLimit)) 107 noop := func(bigtable.Row) bool { return true } 108 if err := tbl.ReadRows(context.Background(), bigtable.NewRange(key, ""), noop, limit); err != nil { 109 log.Printf("Error during scan: %v", err) 110 ok = false 111 } 112 }() 113 } 114 wg.Wait() 115 116 agg := stat.NewAggregate("scans", scans.ds, scans.tries-scans.ok) 117 log.Printf("Scans (%d ok / %d tries):\nscan times:\n%v\nthroughput (rows/second):\n%v", 118 scans.ok, scans.tries, agg, throughputString(agg)) 119} 120 121func throughputString(agg *stat.Aggregate) string { 122 var buf bytes.Buffer 123 tw := tabwriter.NewWriter(&buf, 0, 0, 1, ' ', 0) // one-space padding 124 rowLimitF := float64(*rowLimit) 125 fmt.Fprintf( 126 tw, 127 "min:\t%.2f\nmedian:\t%.2f\nmax:\t%.2f\n", 128 rowLimitF/agg.Max.Seconds(), 129 rowLimitF/agg.Median.Seconds(), 130 rowLimitF/agg.Min.Seconds()) 131 tw.Flush() 132 return buf.String() 133} 134 135var allStats int64 // atomic 136 137type stats struct { 138 mu sync.Mutex 139 tries, ok int 140 ds []time.Duration 141} 142 143func (s *stats) Record(ok bool, d time.Duration) { 144 s.mu.Lock() 145 s.tries++ 146 if ok { 147 s.ok++ 148 } 149 s.ds = append(s.ds, d) 150 s.mu.Unlock() 151 152 if n := atomic.AddInt64(&allStats, 1); n%1000 == 0 { 153 log.Printf("Progress: done %d ops", n) 154 } 155} 156