1package main
2
3import (
4	"context"
5	"encoding/json"
6	"flag"
7	"fmt"
8	"io/ioutil"
9	"net/http"
10	"runtime"
11	"sync"
12	"time"
13
14	"github.com/lightstep/lightstep-tracer-go"
15	"github.com/opentracing/opentracing-go"
16)
17
18const (
19	controlPath           = "/control"
20	resultPath            = "/result"
21	controllerPort        = 8000
22	grpcPort              = 8001
23	controllerHost        = "localhost"
24	controllerAccessToken = "ignored"
25	logsSizeMax           = 1 << 20
26)
27
28var (
29	logPayloadStr string
30)
31
32func fatal(x ...interface{}) {
33	panic(fmt.Sprintln(x...))
34}
35
36func init() {
37	lps := make([]byte, logsSizeMax)
38	for i := 0; i < len(lps); i++ {
39		lps[i] = 'A' + byte(i%26)
40	}
41	logPayloadStr = string(lps)
42}
43
44type control struct {
45	Concurrent int // How many routines, threads, etc.
46
47	// How much work to perform under one span
48	Work int64
49
50	// How many repetitions
51	Repeat int64
52
53	// How many amortized nanoseconds to sleep after each span
54	Sleep time.Duration
55	// How many nanoseconds to sleep at once
56	SleepInterval time.Duration
57
58	// How many bytes per log statement
59	BytesPerLog int64
60	NumLogs     int64
61
62	// Misc control bits
63	Trace   bool // Trace the operation.
64	Exit    bool // Terminate the test.
65	Profile bool // Profile this operation
66}
67
68type testClient struct {
69	baseURL string
70	tracer  opentracing.Tracer
71}
72
73func work(n int64) int64 {
74	const primeWork = 982451653
75	x := int64(primeWork)
76	for n != 0 {
77		x *= primeWork
78		n--
79	}
80	return x
81}
82
83func (t *testClient) getURL(path string) []byte {
84	resp, err := http.Get(t.baseURL + path)
85	if err != nil {
86		fatal("Bench control request failed: ", err)
87	}
88	if resp.StatusCode != 200 {
89		fatal("Bench control status != 200: ", resp.Status, ": ", path)
90	}
91
92	defer resp.Body.Close()
93	body, err := ioutil.ReadAll(resp.Body)
94	if err != nil {
95		fatal("Bench error reading body: ", err)
96	}
97	return body
98}
99
100func (t *testClient) loop() {
101	for {
102		body := t.getURL(controlPath)
103
104		ctrl := control{}
105		if err := json.Unmarshal(body, &ctrl); err != nil {
106			fatal("Bench control parse error: ", err)
107		}
108		if ctrl.Exit {
109			return
110		}
111		timing, flusht, sleeps, answer := t.run(&ctrl)
112		t.getURL(fmt.Sprintf(
113			"%s?timing=%.9f&flush=%.9f&s=%.9f&a=%d",
114			resultPath,
115			timing.Seconds(),
116			flusht.Seconds(),
117			sleeps.Seconds(),
118			answer))
119	}
120}
121
122func testBody(control *control) (time.Duration, int64) {
123	var sleepDebt time.Duration
124	var answer int64
125	var totalSleep time.Duration
126	for i := int64(0); i < control.Repeat; i++ {
127		span := opentracing.StartSpan("span/test")
128		answer = work(control.Work)
129		for i := int64(0); i < control.NumLogs; i++ {
130			span.LogEventWithPayload("testlog",
131				logPayloadStr[0:control.BytesPerLog])
132		}
133		span.Finish()
134		sleepDebt += control.Sleep
135		if sleepDebt <= control.SleepInterval {
136			continue
137		}
138		begin := time.Now()
139		time.Sleep(sleepDebt)
140		elapsed := time.Since(begin)
141		sleepDebt -= elapsed
142		totalSleep += elapsed
143	}
144	return totalSleep, answer
145}
146
147func (t *testClient) run(control *control) (time.Duration, time.Duration, time.Duration, int64) {
148	if control.Trace {
149		opentracing.InitGlobalTracer(t.tracer)
150	} else {
151		opentracing.InitGlobalTracer(opentracing.NoopTracer{})
152	}
153	conc := control.Concurrent
154	runtime.GOMAXPROCS(conc)
155	runtime.GC()
156	runtime.Gosched()
157
158	var sleeps time.Duration
159	var answer int64
160
161	beginTest := time.Now()
162	if conc == 1 {
163		s, a := testBody(control)
164		sleeps += s
165		answer += a
166	} else {
167		start := &sync.WaitGroup{}
168		finish := &sync.WaitGroup{}
169		start.Add(conc)
170		finish.Add(conc)
171		for c := 0; c < conc; c++ {
172			go func() {
173				start.Done()
174				start.Wait()
175				s, a := testBody(control)
176				sleeps += s
177				answer += a
178				finish.Done()
179			}()
180		}
181		finish.Wait()
182	}
183	endTime := time.Now()
184	flushDur := time.Duration(0)
185	if control.Trace {
186		recorder, ok := t.tracer.(lightstep.Tracer)
187		if !ok {
188			panic("Tracer does not have a lightstep recorder")
189		}
190		recorder.Flush(context.Background())
191		flushDur = time.Since(endTime)
192	}
193	return endTime.Sub(beginTest), flushDur, sleeps, answer
194}
195
196func main() {
197	flag.Parse()
198	tc := &testClient{
199		baseURL: fmt.Sprint("http://",
200			controllerHost, ":",
201			controllerPort),
202		tracer: lightstep.NewTracer(lightstep.Options{
203			AccessToken: controllerAccessToken,
204			Collector: lightstep.Endpoint{
205				Host:      controllerHost,
206				Port:      grpcPort,
207				Plaintext: true,
208			},
209		}),
210	}
211	tc.loop()
212}
213