1/*
2Copyright 2016 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17/*
18Scantest does scan-related load testing against Cloud Bigtable. The logic here
19mimics a similar test written using the Java client.
20*/
21package main
22
23import (
24	"bytes"
25	"context"
26	"flag"
27	"fmt"
28	"log"
29	"math/rand"
30	"os"
31	"sync"
32	"sync/atomic"
33	"text/tabwriter"
34	"time"
35
36	"cloud.google.com/go/bigtable"
37	"cloud.google.com/go/bigtable/internal/cbtconfig"
38	"cloud.google.com/go/bigtable/internal/stat"
39)
40
41var (
42	runFor   = flag.Duration("run_for", 5*time.Second, "how long to run the load test for")
43	numScans = flag.Int("concurrent_scans", 1, "number of concurrent scans")
44	rowLimit = flag.Int("row_limit", 10000, "max number of records per scan")
45
46	config *cbtconfig.Config
47	client *bigtable.Client
48)
49
50func main() {
51	flag.Usage = func() {
52		fmt.Printf("Usage: scantest [options] <table_name>\n\n")
53		flag.PrintDefaults()
54	}
55
56	var err error
57	config, err = cbtconfig.Load()
58	if err != nil {
59		log.Fatal(err)
60	}
61	config.RegisterFlags()
62
63	flag.Parse()
64	if err := config.CheckFlags(cbtconfig.ProjectAndInstanceRequired); err != nil {
65		log.Fatal(err)
66	}
67	if config.Creds != "" {
68		os.Setenv("GOOGLE_APPLICATION_CREDENTIALS", config.Creds)
69	}
70	if flag.NArg() != 1 {
71		flag.Usage()
72		os.Exit(1)
73	}
74
75	table := flag.Arg(0)
76
77	log.Printf("Dialing connections...")
78	client, err = bigtable.NewClient(context.Background(), config.Project, config.Instance)
79	if err != nil {
80		log.Fatalf("Making bigtable.Client: %v", err)
81	}
82	defer client.Close()
83
84	log.Printf("Starting scan test... (run for %v)", *runFor)
85	tbl := client.Open(table)
86	sem := make(chan int, *numScans) // limit the number of requests happening at once
87	var scans stats
88
89	stopTime := time.Now().Add(*runFor)
90	var wg sync.WaitGroup
91	for time.Now().Before(stopTime) {
92		sem <- 1
93		wg.Add(1)
94		go func() {
95			defer wg.Done()
96			defer func() { <-sem }()
97
98			ok := true
99			opStart := time.Now()
100			defer func() {
101				scans.Record(ok, time.Since(opStart))
102			}()
103
104			// Start at a random row key
105			key := fmt.Sprintf("user%d", rand.Int63())
106			limit := bigtable.LimitRows(int64(*rowLimit))
107			noop := func(bigtable.Row) bool { return true }
108			if err := tbl.ReadRows(context.Background(), bigtable.NewRange(key, ""), noop, limit); err != nil {
109				log.Printf("Error during scan: %v", err)
110				ok = false
111			}
112		}()
113	}
114	wg.Wait()
115
116	agg := stat.NewAggregate("scans", scans.ds, scans.tries-scans.ok)
117	log.Printf("Scans (%d ok / %d tries):\nscan times:\n%v\nthroughput (rows/second):\n%v",
118		scans.ok, scans.tries, agg, throughputString(agg))
119}
120
121func throughputString(agg *stat.Aggregate) string {
122	var buf bytes.Buffer
123	tw := tabwriter.NewWriter(&buf, 0, 0, 1, ' ', 0) // one-space padding
124	rowLimitF := float64(*rowLimit)
125	fmt.Fprintf(
126		tw,
127		"min:\t%.2f\nmedian:\t%.2f\nmax:\t%.2f\n",
128		rowLimitF/agg.Max.Seconds(),
129		rowLimitF/agg.Median.Seconds(),
130		rowLimitF/agg.Min.Seconds())
131	tw.Flush()
132	return buf.String()
133}
134
135var allStats int64 // atomic
136
137type stats struct {
138	mu        sync.Mutex
139	tries, ok int
140	ds        []time.Duration
141}
142
143func (s *stats) Record(ok bool, d time.Duration) {
144	s.mu.Lock()
145	s.tries++
146	if ok {
147		s.ok++
148	}
149	s.ds = append(s.ds, d)
150	s.mu.Unlock()
151
152	if n := atomic.AddInt64(&allStats, 1); n%1000 == 0 {
153		log.Printf("Progress: done %d ops", n)
154	}
155}
156