1package main 2 3import ( 4 "context" 5 "encoding/json" 6 "flag" 7 "fmt" 8 "io/ioutil" 9 "net/http" 10 "runtime" 11 "sync" 12 "time" 13 14 "github.com/lightstep/lightstep-tracer-go" 15 "github.com/opentracing/opentracing-go" 16) 17 18const ( 19 controlPath = "/control" 20 resultPath = "/result" 21 controllerPort = 8000 22 grpcPort = 8001 23 controllerHost = "localhost" 24 controllerAccessToken = "ignored" 25 logsSizeMax = 1 << 20 26) 27 28var ( 29 logPayloadStr string 30) 31 32func fatal(x ...interface{}) { 33 panic(fmt.Sprintln(x...)) 34} 35 36func init() { 37 lps := make([]byte, logsSizeMax) 38 for i := 0; i < len(lps); i++ { 39 lps[i] = 'A' + byte(i%26) 40 } 41 logPayloadStr = string(lps) 42} 43 44type control struct { 45 Concurrent int // How many routines, threads, etc. 46 47 // How much work to perform under one span 48 Work int64 49 50 // How many repetitions 51 Repeat int64 52 53 // How many amortized nanoseconds to sleep after each span 54 Sleep time.Duration 55 // How many nanoseconds to sleep at once 56 SleepInterval time.Duration 57 58 // How many bytes per log statement 59 BytesPerLog int64 60 NumLogs int64 61 62 // Misc control bits 63 Trace bool // Trace the operation. 64 Exit bool // Terminate the test. 65 Profile bool // Profile this operation 66} 67 68type testClient struct { 69 baseURL string 70 tracer opentracing.Tracer 71} 72 73func work(n int64) int64 { 74 const primeWork = 982451653 75 x := int64(primeWork) 76 for n != 0 { 77 x *= primeWork 78 n-- 79 } 80 return x 81} 82 83func (t *testClient) getURL(path string) []byte { 84 resp, err := http.Get(t.baseURL + path) 85 if err != nil { 86 fatal("Bench control request failed: ", err) 87 } 88 if resp.StatusCode != 200 { 89 fatal("Bench control status != 200: ", resp.Status, ": ", path) 90 } 91 92 defer resp.Body.Close() 93 body, err := ioutil.ReadAll(resp.Body) 94 if err != nil { 95 fatal("Bench error reading body: ", err) 96 } 97 return body 98} 99 100func (t *testClient) loop() { 101 for { 102 body := t.getURL(controlPath) 103 104 ctrl := control{} 105 if err := json.Unmarshal(body, &ctrl); err != nil { 106 fatal("Bench control parse error: ", err) 107 } 108 if ctrl.Exit { 109 return 110 } 111 timing, flusht, sleeps, answer := t.run(&ctrl) 112 t.getURL(fmt.Sprintf( 113 "%s?timing=%.9f&flush=%.9f&s=%.9f&a=%d", 114 resultPath, 115 timing.Seconds(), 116 flusht.Seconds(), 117 sleeps.Seconds(), 118 answer)) 119 } 120} 121 122func testBody(control *control) (time.Duration, int64) { 123 var sleepDebt time.Duration 124 var answer int64 125 var totalSleep time.Duration 126 for i := int64(0); i < control.Repeat; i++ { 127 span := opentracing.StartSpan("span/test") 128 answer = work(control.Work) 129 for i := int64(0); i < control.NumLogs; i++ { 130 span.LogEventWithPayload("testlog", 131 logPayloadStr[0:control.BytesPerLog]) 132 } 133 span.Finish() 134 sleepDebt += control.Sleep 135 if sleepDebt <= control.SleepInterval { 136 continue 137 } 138 begin := time.Now() 139 time.Sleep(sleepDebt) 140 elapsed := time.Since(begin) 141 sleepDebt -= elapsed 142 totalSleep += elapsed 143 } 144 return totalSleep, answer 145} 146 147func (t *testClient) run(control *control) (time.Duration, time.Duration, time.Duration, int64) { 148 if control.Trace { 149 opentracing.InitGlobalTracer(t.tracer) 150 } else { 151 opentracing.InitGlobalTracer(opentracing.NoopTracer{}) 152 } 153 conc := control.Concurrent 154 runtime.GOMAXPROCS(conc) 155 runtime.GC() 156 runtime.Gosched() 157 158 var sleeps time.Duration 159 var answer int64 160 161 beginTest := time.Now() 162 if conc == 1 { 163 s, a := testBody(control) 164 sleeps += s 165 answer += a 166 } else { 167 start := &sync.WaitGroup{} 168 finish := &sync.WaitGroup{} 169 start.Add(conc) 170 finish.Add(conc) 171 for c := 0; c < conc; c++ { 172 go func() { 173 start.Done() 174 start.Wait() 175 s, a := testBody(control) 176 sleeps += s 177 answer += a 178 finish.Done() 179 }() 180 } 181 finish.Wait() 182 } 183 endTime := time.Now() 184 flushDur := time.Duration(0) 185 if control.Trace { 186 recorder, ok := t.tracer.(lightstep.Tracer) 187 if !ok { 188 panic("Tracer does not have a lightstep recorder") 189 } 190 recorder.Flush(context.Background()) 191 flushDur = time.Since(endTime) 192 } 193 return endTime.Sub(beginTest), flushDur, sleeps, answer 194} 195 196func main() { 197 flag.Parse() 198 tc := &testClient{ 199 baseURL: fmt.Sprint("http://", 200 controllerHost, ":", 201 controllerPort), 202 tracer: lightstep.NewTracer(lightstep.Options{ 203 AccessToken: controllerAccessToken, 204 Collector: lightstep.Endpoint{ 205 Host: controllerHost, 206 Port: grpcPort, 207 Plaintext: true, 208 }, 209 }), 210 } 211 tc.loop() 212} 213