1/*
2Copyright 2020 Google LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8    http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package spanner
18
19import (
20	"context"
21	"fmt"
22	"math/rand"
23	"reflect"
24	"sync"
25	"testing"
26	"time"
27
28	. "cloud.google.com/go/spanner/internal/testutil"
29	"google.golang.org/api/iterator"
30	sppb "google.golang.org/genproto/googleapis/spanner/v1"
31)
32
33const networkLatencyTime = 10 * time.Millisecond
34const batchCreateSessionsMinTime = 10 * time.Millisecond
35const batchCreateSessionsRndTime = 10 * time.Millisecond
36const beginTransactionMinTime = 1 * time.Millisecond
37const beginTransactionRndTime = 1 * time.Millisecond
38const commitTransactionMinTime = 5 * time.Millisecond
39const commitTransactionRndTime = 5 * time.Millisecond
40const executeStreamingSqlMinTime = 10 * time.Millisecond
41const executeStreamingSqlRndTime = 10 * time.Millisecond
42const executeSqlMinTime = 10 * time.Millisecond
43const executeSqlRndTime = 10 * time.Millisecond
44
45const holdSessionTime = 100
46const rndWaitTimeBetweenRequests = 10
47
48var mu sync.Mutex
49var rnd = rand.New(rand.NewSource(time.Now().UnixNano()))
50
51func createBenchmarkServer(incStep uint64) (server *MockedSpannerInMemTestServer, client *Client, teardown func()) {
52	t := &testing.T{}
53	server, client, teardown = setupMockedTestServerWithConfig(t, ClientConfig{
54		SessionPoolConfig: SessionPoolConfig{
55			MinOpened:     100,
56			MaxOpened:     400,
57			WriteSessions: 0.2,
58			incStep:       incStep,
59		},
60	})
61	server.TestSpanner.PutExecutionTime(MethodBatchCreateSession, SimulatedExecutionTime{
62		MinimumExecutionTime: networkLatencyTime + batchCreateSessionsMinTime,
63		RandomExecutionTime:  batchCreateSessionsRndTime,
64	})
65	server.TestSpanner.PutExecutionTime(MethodCreateSession, SimulatedExecutionTime{
66		MinimumExecutionTime: networkLatencyTime + batchCreateSessionsMinTime,
67		RandomExecutionTime:  batchCreateSessionsRndTime,
68	})
69	server.TestSpanner.PutExecutionTime(MethodExecuteStreamingSql, SimulatedExecutionTime{
70		MinimumExecutionTime: networkLatencyTime + executeStreamingSqlMinTime,
71		RandomExecutionTime:  executeStreamingSqlRndTime,
72	})
73	server.TestSpanner.PutExecutionTime(MethodBeginTransaction, SimulatedExecutionTime{
74		MinimumExecutionTime: networkLatencyTime + beginTransactionMinTime,
75		RandomExecutionTime:  beginTransactionRndTime,
76	})
77	server.TestSpanner.PutExecutionTime(MethodCommitTransaction, SimulatedExecutionTime{
78		MinimumExecutionTime: networkLatencyTime + commitTransactionMinTime,
79		RandomExecutionTime:  commitTransactionRndTime,
80	})
81	server.TestSpanner.PutExecutionTime(MethodExecuteSql, SimulatedExecutionTime{
82		MinimumExecutionTime: networkLatencyTime + executeSqlMinTime,
83		RandomExecutionTime:  executeSqlRndTime,
84	})
85	// Wait until the session pool has been initialized.
86	waitFor(t, func() error {
87		if uint64(client.idleSessions.idleList.Len()+client.idleSessions.idleWriteList.Len()) == client.idleSessions.MinOpened {
88			return nil
89		}
90		return fmt.Errorf("not yet initialized")
91	})
92	return
93}
94
95func readWorker(client *Client, b *testing.B, jobs <-chan int, results chan<- int) {
96	for range jobs {
97		mu.Lock()
98		d := time.Millisecond * time.Duration(rnd.Int63n(rndWaitTimeBetweenRequests))
99		mu.Unlock()
100		time.Sleep(d)
101		iter := client.Single().Query(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
102		row := 0
103		for {
104			_, err := iter.Next()
105			if err == iterator.Done {
106				break
107			}
108			if err != nil {
109				b.Fatal(err)
110			}
111			row++
112			if row == 1 {
113				mu.Lock()
114				d := time.Millisecond * time.Duration(rnd.Int63n(holdSessionTime))
115				mu.Unlock()
116				time.Sleep(d)
117			}
118		}
119		iter.Stop()
120		results <- row
121	}
122}
123
124func writeWorker(client *Client, b *testing.B, jobs <-chan int, results chan<- int64) {
125	for range jobs {
126		mu.Lock()
127		d := time.Millisecond * time.Duration(rnd.Int63n(rndWaitTimeBetweenRequests))
128		mu.Unlock()
129		time.Sleep(d)
130		var updateCount int64
131		var err error
132		if _, err = client.ReadWriteTransaction(context.Background(), func(ctx context.Context, transaction *ReadWriteTransaction) error {
133			if updateCount, err = transaction.Update(ctx, NewStatement(UpdateBarSetFoo)); err != nil {
134				return err
135			}
136			return nil
137		}); err != nil {
138			b.Fatal(err)
139		}
140		results <- updateCount
141	}
142}
143
144func Benchmark_Client_BurstRead_IncStep01(b *testing.B) {
145	benchmarkClientBurstRead(b, 1)
146}
147
148func Benchmark_Client_BurstRead_IncStep10(b *testing.B) {
149	benchmarkClientBurstRead(b, 10)
150}
151
152func Benchmark_Client_BurstRead_IncStep20(b *testing.B) {
153	benchmarkClientBurstRead(b, 20)
154}
155
156func Benchmark_Client_BurstRead_IncStep25(b *testing.B) {
157	benchmarkClientBurstRead(b, 25)
158}
159
160func Benchmark_Client_BurstRead_IncStep30(b *testing.B) {
161	benchmarkClientBurstRead(b, 30)
162}
163
164func Benchmark_Client_BurstRead_IncStep40(b *testing.B) {
165	benchmarkClientBurstRead(b, 40)
166}
167
168func Benchmark_Client_BurstRead_IncStep50(b *testing.B) {
169	benchmarkClientBurstRead(b, 50)
170}
171
172func Benchmark_Client_BurstRead_IncStep100(b *testing.B) {
173	benchmarkClientBurstRead(b, 100)
174}
175
176func benchmarkClientBurstRead(b *testing.B, incStep uint64) {
177	for n := 0; n < b.N; n++ {
178		server, client, teardown := createBenchmarkServer(incStep)
179		sp := client.idleSessions
180		if uint64(sp.idleList.Len()+sp.idleWriteList.Len()) != sp.MinOpened {
181			b.Fatalf("session count mismatch\nGot: %d\nWant: %d", sp.idleList.Len()+sp.idleWriteList.Len(), sp.MinOpened)
182		}
183
184		totalQueries := int(sp.MaxOpened * 8)
185		jobs := make(chan int, totalQueries)
186		results := make(chan int, totalQueries)
187		parallel := int(sp.MaxOpened * 2)
188
189		for w := 0; w < parallel; w++ {
190			go readWorker(client, b, jobs, results)
191		}
192		for j := 0; j < totalQueries; j++ {
193			jobs <- j
194		}
195		close(jobs)
196		totalRows := 0
197		for a := 0; a < totalQueries; a++ {
198			totalRows = totalRows + <-results
199		}
200		reportBenchmark(b, sp, server)
201		teardown()
202	}
203}
204
205func Benchmark_Client_BurstWrite01(b *testing.B) {
206	benchmarkClientBurstWrite(b, 1)
207}
208
209func Benchmark_Client_BurstWrite10(b *testing.B) {
210	benchmarkClientBurstWrite(b, 10)
211}
212
213func Benchmark_Client_BurstWrite20(b *testing.B) {
214	benchmarkClientBurstWrite(b, 20)
215}
216
217func Benchmark_Client_BurstWrite25(b *testing.B) {
218	benchmarkClientBurstWrite(b, 25)
219}
220
221func Benchmark_Client_BurstWrite30(b *testing.B) {
222	benchmarkClientBurstWrite(b, 30)
223}
224
225func Benchmark_Client_BurstWrite40(b *testing.B) {
226	benchmarkClientBurstWrite(b, 40)
227}
228
229func Benchmark_Client_BurstWrite50(b *testing.B) {
230	benchmarkClientBurstWrite(b, 50)
231}
232
233func Benchmark_Client_BurstWrite100(b *testing.B) {
234	benchmarkClientBurstWrite(b, 100)
235}
236
237func benchmarkClientBurstWrite(b *testing.B, incStep uint64) {
238	for n := 0; n < b.N; n++ {
239		server, client, teardown := createBenchmarkServer(incStep)
240		sp := client.idleSessions
241		if uint64(sp.idleList.Len()+sp.idleWriteList.Len()) != sp.MinOpened {
242			b.Fatalf("session count mismatch\nGot: %d\nWant: %d", sp.idleList.Len()+sp.idleWriteList.Len(), sp.MinOpened)
243		}
244
245		totalUpdates := int(sp.MaxOpened * 8)
246		jobs := make(chan int, totalUpdates)
247		results := make(chan int64, totalUpdates)
248		parallel := int(sp.MaxOpened * 2)
249
250		for w := 0; w < parallel; w++ {
251			go writeWorker(client, b, jobs, results)
252		}
253		for j := 0; j < totalUpdates; j++ {
254			jobs <- j
255		}
256		close(jobs)
257		totalRows := int64(0)
258		for a := 0; a < totalUpdates; a++ {
259			totalRows = totalRows + <-results
260		}
261		reportBenchmark(b, sp, server)
262		teardown()
263	}
264}
265
266func Benchmark_Client_BurstReadAndWrite01(b *testing.B) {
267	benchmarkClientBurstReadAndWrite(b, 1)
268}
269
270func Benchmark_Client_BurstReadAndWrite10(b *testing.B) {
271	benchmarkClientBurstReadAndWrite(b, 10)
272}
273
274func Benchmark_Client_BurstReadAndWrite20(b *testing.B) {
275	benchmarkClientBurstReadAndWrite(b, 20)
276}
277
278func Benchmark_Client_BurstReadAndWrite25(b *testing.B) {
279	benchmarkClientBurstReadAndWrite(b, 25)
280}
281
282func Benchmark_Client_BurstReadAndWrite30(b *testing.B) {
283	benchmarkClientBurstReadAndWrite(b, 30)
284}
285
286func Benchmark_Client_BurstReadAndWrite40(b *testing.B) {
287	benchmarkClientBurstReadAndWrite(b, 40)
288}
289
290func Benchmark_Client_BurstReadAndWrite50(b *testing.B) {
291	benchmarkClientBurstReadAndWrite(b, 50)
292}
293
294func Benchmark_Client_BurstReadAndWrite100(b *testing.B) {
295	benchmarkClientBurstReadAndWrite(b, 100)
296}
297
298func benchmarkClientBurstReadAndWrite(b *testing.B, incStep uint64) {
299	for n := 0; n < b.N; n++ {
300		server, client, teardown := createBenchmarkServer(incStep)
301		sp := client.idleSessions
302		if uint64(sp.idleList.Len()+sp.idleWriteList.Len()) != sp.MinOpened {
303			b.Fatalf("session count mismatch\nGot: %d\nWant: %d", sp.idleList.Len()+sp.idleWriteList.Len(), sp.MinOpened)
304		}
305
306		totalUpdates := int(sp.MaxOpened * 4)
307		writeJobs := make(chan int, totalUpdates)
308		writeResults := make(chan int64, totalUpdates)
309		parallelWrites := int(sp.MaxOpened)
310
311		totalQueries := int(sp.MaxOpened * 4)
312		readJobs := make(chan int, totalQueries)
313		readResults := make(chan int, totalQueries)
314		parallelReads := int(sp.MaxOpened)
315
316		for w := 0; w < parallelWrites; w++ {
317			go writeWorker(client, b, writeJobs, writeResults)
318		}
319		for j := 0; j < totalUpdates; j++ {
320			writeJobs <- j
321		}
322		for w := 0; w < parallelReads; w++ {
323			go readWorker(client, b, readJobs, readResults)
324		}
325		for j := 0; j < totalQueries; j++ {
326			readJobs <- j
327		}
328
329		close(writeJobs)
330		close(readJobs)
331
332		totalUpdatedRows := int64(0)
333		for a := 0; a < totalUpdates; a++ {
334			totalUpdatedRows = totalUpdatedRows + <-writeResults
335		}
336		totalReadRows := 0
337		for a := 0; a < totalQueries; a++ {
338			totalReadRows = totalReadRows + <-readResults
339		}
340		reportBenchmark(b, sp, server)
341		teardown()
342	}
343}
344
345func Benchmark_Client_SteadyIncrease01(b *testing.B) {
346	benchmarkClientSteadyIncrease(b, 1)
347}
348
349func Benchmark_Client_SteadyIncrease10(b *testing.B) {
350	benchmarkClientSteadyIncrease(b, 10)
351}
352
353func Benchmark_Client_SteadyIncrease20(b *testing.B) {
354	benchmarkClientSteadyIncrease(b, 20)
355}
356
357func Benchmark_Client_SteadyIncrease25(b *testing.B) {
358	benchmarkClientSteadyIncrease(b, 25)
359}
360
361func Benchmark_Client_SteadyIncrease30(b *testing.B) {
362	benchmarkClientSteadyIncrease(b, 30)
363}
364
365func Benchmark_Client_SteadyIncrease40(b *testing.B) {
366	benchmarkClientSteadyIncrease(b, 40)
367}
368
369func Benchmark_Client_SteadyIncrease50(b *testing.B) {
370	benchmarkClientSteadyIncrease(b, 50)
371}
372
373func Benchmark_Client_SteadyIncrease100(b *testing.B) {
374	benchmarkClientSteadyIncrease(b, 100)
375}
376
377func benchmarkClientSteadyIncrease(b *testing.B, incStep uint64) {
378	for n := 0; n < b.N; n++ {
379		server, client, teardown := createBenchmarkServer(incStep)
380		sp := client.idleSessions
381		if uint64(sp.idleList.Len()+sp.idleWriteList.Len()) != sp.MinOpened {
382			b.Fatalf("session count mismatch\nGot: %d\nWant: %d", sp.idleList.Len()+sp.idleWriteList.Len(), sp.MinOpened)
383		}
384
385		transactions := make([]*ReadOnlyTransaction, sp.MaxOpened)
386		for i := uint64(0); i < sp.MaxOpened; i++ {
387			transactions[i] = client.ReadOnlyTransaction()
388			transactions[i].Query(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums))
389		}
390		for i := uint64(0); i < sp.MaxOpened; i++ {
391			transactions[i].Close()
392		}
393		reportBenchmark(b, sp, server)
394		teardown()
395	}
396}
397
398func reportBenchmark(b *testing.B, sp *sessionPool, server *MockedSpannerInMemTestServer) {
399	sp.mu.Lock()
400	defer sp.mu.Unlock()
401	requests := drainRequestsFromServer(server.TestSpanner)
402	// TODO(loite): Use b.ReportMetric when Go1.13 is the minimum required.
403	b.Logf("BatchCreateSessions: %d\t", countRequests(requests, reflect.TypeOf(&sppb.BatchCreateSessionsRequest{})))
404	b.Logf("CreateSession: %d\t", countRequests(requests, reflect.TypeOf(&sppb.CreateSessionRequest{})))
405	b.Logf("BeginTransaction: %d\t", countRequests(requests, reflect.TypeOf(&sppb.BeginTransactionRequest{})))
406	b.Logf("Commit: %d\t", countRequests(requests, reflect.TypeOf(&sppb.CommitRequest{})))
407	b.Logf("ReadSessions: %d\t", sp.idleList.Len())
408	b.Logf("WriteSessions: %d\n", sp.idleWriteList.Len())
409}
410
411func countRequests(requests []interface{}, tp reflect.Type) (count int) {
412	for _, req := range requests {
413		if tp == reflect.TypeOf(req) {
414			count++
415		}
416	}
417	return count
418}
419