1/* 2Copyright 2020 Google LLC 3 4Licensed under the Apache License, Version 2.0 (the "License"); 5you may not use this file except in compliance with the License. 6You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10Unless required by applicable law or agreed to in writing, software 11distributed under the License is distributed on an "AS IS" BASIS, 12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13See the License for the specific language governing permissions and 14limitations under the License. 15*/ 16 17package spanner 18 19import ( 20 "context" 21 "fmt" 22 "math/rand" 23 "reflect" 24 "sync" 25 "testing" 26 "time" 27 28 . "cloud.google.com/go/spanner/internal/testutil" 29 "google.golang.org/api/iterator" 30 sppb "google.golang.org/genproto/googleapis/spanner/v1" 31) 32 33const networkLatencyTime = 10 * time.Millisecond 34const batchCreateSessionsMinTime = 10 * time.Millisecond 35const batchCreateSessionsRndTime = 10 * time.Millisecond 36const beginTransactionMinTime = 1 * time.Millisecond 37const beginTransactionRndTime = 1 * time.Millisecond 38const commitTransactionMinTime = 5 * time.Millisecond 39const commitTransactionRndTime = 5 * time.Millisecond 40const executeStreamingSqlMinTime = 10 * time.Millisecond 41const executeStreamingSqlRndTime = 10 * time.Millisecond 42const executeSqlMinTime = 10 * time.Millisecond 43const executeSqlRndTime = 10 * time.Millisecond 44 45const holdSessionTime = 100 46const rndWaitTimeBetweenRequests = 10 47 48var mu sync.Mutex 49var rnd = rand.New(rand.NewSource(time.Now().UnixNano())) 50 51func createBenchmarkServer(incStep uint64) (server *MockedSpannerInMemTestServer, client *Client, teardown func()) { 52 t := &testing.T{} 53 server, client, teardown = setupMockedTestServerWithConfig(t, ClientConfig{ 54 SessionPoolConfig: SessionPoolConfig{ 55 MinOpened: 100, 56 MaxOpened: 400, 57 WriteSessions: 0.2, 58 incStep: incStep, 59 }, 60 }) 61 server.TestSpanner.PutExecutionTime(MethodBatchCreateSession, SimulatedExecutionTime{ 62 MinimumExecutionTime: networkLatencyTime + batchCreateSessionsMinTime, 63 RandomExecutionTime: batchCreateSessionsRndTime, 64 }) 65 server.TestSpanner.PutExecutionTime(MethodCreateSession, SimulatedExecutionTime{ 66 MinimumExecutionTime: networkLatencyTime + batchCreateSessionsMinTime, 67 RandomExecutionTime: batchCreateSessionsRndTime, 68 }) 69 server.TestSpanner.PutExecutionTime(MethodExecuteStreamingSql, SimulatedExecutionTime{ 70 MinimumExecutionTime: networkLatencyTime + executeStreamingSqlMinTime, 71 RandomExecutionTime: executeStreamingSqlRndTime, 72 }) 73 server.TestSpanner.PutExecutionTime(MethodBeginTransaction, SimulatedExecutionTime{ 74 MinimumExecutionTime: networkLatencyTime + beginTransactionMinTime, 75 RandomExecutionTime: beginTransactionRndTime, 76 }) 77 server.TestSpanner.PutExecutionTime(MethodCommitTransaction, SimulatedExecutionTime{ 78 MinimumExecutionTime: networkLatencyTime + commitTransactionMinTime, 79 RandomExecutionTime: commitTransactionRndTime, 80 }) 81 server.TestSpanner.PutExecutionTime(MethodExecuteSql, SimulatedExecutionTime{ 82 MinimumExecutionTime: networkLatencyTime + executeSqlMinTime, 83 RandomExecutionTime: executeSqlRndTime, 84 }) 85 // Wait until the session pool has been initialized. 86 waitFor(t, func() error { 87 if uint64(client.idleSessions.idleList.Len()+client.idleSessions.idleWriteList.Len()) == client.idleSessions.MinOpened { 88 return nil 89 } 90 return fmt.Errorf("not yet initialized") 91 }) 92 return 93} 94 95func readWorker(client *Client, b *testing.B, jobs <-chan int, results chan<- int) { 96 for range jobs { 97 mu.Lock() 98 d := time.Millisecond * time.Duration(rnd.Int63n(rndWaitTimeBetweenRequests)) 99 mu.Unlock() 100 time.Sleep(d) 101 iter := client.Single().Query(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums)) 102 row := 0 103 for { 104 _, err := iter.Next() 105 if err == iterator.Done { 106 break 107 } 108 if err != nil { 109 b.Fatal(err) 110 } 111 row++ 112 if row == 1 { 113 mu.Lock() 114 d := time.Millisecond * time.Duration(rnd.Int63n(holdSessionTime)) 115 mu.Unlock() 116 time.Sleep(d) 117 } 118 } 119 iter.Stop() 120 results <- row 121 } 122} 123 124func writeWorker(client *Client, b *testing.B, jobs <-chan int, results chan<- int64) { 125 for range jobs { 126 mu.Lock() 127 d := time.Millisecond * time.Duration(rnd.Int63n(rndWaitTimeBetweenRequests)) 128 mu.Unlock() 129 time.Sleep(d) 130 var updateCount int64 131 var err error 132 if _, err = client.ReadWriteTransaction(context.Background(), func(ctx context.Context, transaction *ReadWriteTransaction) error { 133 if updateCount, err = transaction.Update(ctx, NewStatement(UpdateBarSetFoo)); err != nil { 134 return err 135 } 136 return nil 137 }); err != nil { 138 b.Fatal(err) 139 } 140 results <- updateCount 141 } 142} 143 144func Benchmark_Client_BurstRead_IncStep01(b *testing.B) { 145 benchmarkClientBurstRead(b, 1) 146} 147 148func Benchmark_Client_BurstRead_IncStep10(b *testing.B) { 149 benchmarkClientBurstRead(b, 10) 150} 151 152func Benchmark_Client_BurstRead_IncStep20(b *testing.B) { 153 benchmarkClientBurstRead(b, 20) 154} 155 156func Benchmark_Client_BurstRead_IncStep25(b *testing.B) { 157 benchmarkClientBurstRead(b, 25) 158} 159 160func Benchmark_Client_BurstRead_IncStep30(b *testing.B) { 161 benchmarkClientBurstRead(b, 30) 162} 163 164func Benchmark_Client_BurstRead_IncStep40(b *testing.B) { 165 benchmarkClientBurstRead(b, 40) 166} 167 168func Benchmark_Client_BurstRead_IncStep50(b *testing.B) { 169 benchmarkClientBurstRead(b, 50) 170} 171 172func Benchmark_Client_BurstRead_IncStep100(b *testing.B) { 173 benchmarkClientBurstRead(b, 100) 174} 175 176func benchmarkClientBurstRead(b *testing.B, incStep uint64) { 177 for n := 0; n < b.N; n++ { 178 server, client, teardown := createBenchmarkServer(incStep) 179 sp := client.idleSessions 180 if uint64(sp.idleList.Len()+sp.idleWriteList.Len()) != sp.MinOpened { 181 b.Fatalf("session count mismatch\nGot: %d\nWant: %d", sp.idleList.Len()+sp.idleWriteList.Len(), sp.MinOpened) 182 } 183 184 totalQueries := int(sp.MaxOpened * 8) 185 jobs := make(chan int, totalQueries) 186 results := make(chan int, totalQueries) 187 parallel := int(sp.MaxOpened * 2) 188 189 for w := 0; w < parallel; w++ { 190 go readWorker(client, b, jobs, results) 191 } 192 for j := 0; j < totalQueries; j++ { 193 jobs <- j 194 } 195 close(jobs) 196 totalRows := 0 197 for a := 0; a < totalQueries; a++ { 198 totalRows = totalRows + <-results 199 } 200 reportBenchmark(b, sp, server) 201 teardown() 202 } 203} 204 205func Benchmark_Client_BurstWrite01(b *testing.B) { 206 benchmarkClientBurstWrite(b, 1) 207} 208 209func Benchmark_Client_BurstWrite10(b *testing.B) { 210 benchmarkClientBurstWrite(b, 10) 211} 212 213func Benchmark_Client_BurstWrite20(b *testing.B) { 214 benchmarkClientBurstWrite(b, 20) 215} 216 217func Benchmark_Client_BurstWrite25(b *testing.B) { 218 benchmarkClientBurstWrite(b, 25) 219} 220 221func Benchmark_Client_BurstWrite30(b *testing.B) { 222 benchmarkClientBurstWrite(b, 30) 223} 224 225func Benchmark_Client_BurstWrite40(b *testing.B) { 226 benchmarkClientBurstWrite(b, 40) 227} 228 229func Benchmark_Client_BurstWrite50(b *testing.B) { 230 benchmarkClientBurstWrite(b, 50) 231} 232 233func Benchmark_Client_BurstWrite100(b *testing.B) { 234 benchmarkClientBurstWrite(b, 100) 235} 236 237func benchmarkClientBurstWrite(b *testing.B, incStep uint64) { 238 for n := 0; n < b.N; n++ { 239 server, client, teardown := createBenchmarkServer(incStep) 240 sp := client.idleSessions 241 if uint64(sp.idleList.Len()+sp.idleWriteList.Len()) != sp.MinOpened { 242 b.Fatalf("session count mismatch\nGot: %d\nWant: %d", sp.idleList.Len()+sp.idleWriteList.Len(), sp.MinOpened) 243 } 244 245 totalUpdates := int(sp.MaxOpened * 8) 246 jobs := make(chan int, totalUpdates) 247 results := make(chan int64, totalUpdates) 248 parallel := int(sp.MaxOpened * 2) 249 250 for w := 0; w < parallel; w++ { 251 go writeWorker(client, b, jobs, results) 252 } 253 for j := 0; j < totalUpdates; j++ { 254 jobs <- j 255 } 256 close(jobs) 257 totalRows := int64(0) 258 for a := 0; a < totalUpdates; a++ { 259 totalRows = totalRows + <-results 260 } 261 reportBenchmark(b, sp, server) 262 teardown() 263 } 264} 265 266func Benchmark_Client_BurstReadAndWrite01(b *testing.B) { 267 benchmarkClientBurstReadAndWrite(b, 1) 268} 269 270func Benchmark_Client_BurstReadAndWrite10(b *testing.B) { 271 benchmarkClientBurstReadAndWrite(b, 10) 272} 273 274func Benchmark_Client_BurstReadAndWrite20(b *testing.B) { 275 benchmarkClientBurstReadAndWrite(b, 20) 276} 277 278func Benchmark_Client_BurstReadAndWrite25(b *testing.B) { 279 benchmarkClientBurstReadAndWrite(b, 25) 280} 281 282func Benchmark_Client_BurstReadAndWrite30(b *testing.B) { 283 benchmarkClientBurstReadAndWrite(b, 30) 284} 285 286func Benchmark_Client_BurstReadAndWrite40(b *testing.B) { 287 benchmarkClientBurstReadAndWrite(b, 40) 288} 289 290func Benchmark_Client_BurstReadAndWrite50(b *testing.B) { 291 benchmarkClientBurstReadAndWrite(b, 50) 292} 293 294func Benchmark_Client_BurstReadAndWrite100(b *testing.B) { 295 benchmarkClientBurstReadAndWrite(b, 100) 296} 297 298func benchmarkClientBurstReadAndWrite(b *testing.B, incStep uint64) { 299 for n := 0; n < b.N; n++ { 300 server, client, teardown := createBenchmarkServer(incStep) 301 sp := client.idleSessions 302 if uint64(sp.idleList.Len()+sp.idleWriteList.Len()) != sp.MinOpened { 303 b.Fatalf("session count mismatch\nGot: %d\nWant: %d", sp.idleList.Len()+sp.idleWriteList.Len(), sp.MinOpened) 304 } 305 306 totalUpdates := int(sp.MaxOpened * 4) 307 writeJobs := make(chan int, totalUpdates) 308 writeResults := make(chan int64, totalUpdates) 309 parallelWrites := int(sp.MaxOpened) 310 311 totalQueries := int(sp.MaxOpened * 4) 312 readJobs := make(chan int, totalQueries) 313 readResults := make(chan int, totalQueries) 314 parallelReads := int(sp.MaxOpened) 315 316 for w := 0; w < parallelWrites; w++ { 317 go writeWorker(client, b, writeJobs, writeResults) 318 } 319 for j := 0; j < totalUpdates; j++ { 320 writeJobs <- j 321 } 322 for w := 0; w < parallelReads; w++ { 323 go readWorker(client, b, readJobs, readResults) 324 } 325 for j := 0; j < totalQueries; j++ { 326 readJobs <- j 327 } 328 329 close(writeJobs) 330 close(readJobs) 331 332 totalUpdatedRows := int64(0) 333 for a := 0; a < totalUpdates; a++ { 334 totalUpdatedRows = totalUpdatedRows + <-writeResults 335 } 336 totalReadRows := 0 337 for a := 0; a < totalQueries; a++ { 338 totalReadRows = totalReadRows + <-readResults 339 } 340 reportBenchmark(b, sp, server) 341 teardown() 342 } 343} 344 345func Benchmark_Client_SteadyIncrease01(b *testing.B) { 346 benchmarkClientSteadyIncrease(b, 1) 347} 348 349func Benchmark_Client_SteadyIncrease10(b *testing.B) { 350 benchmarkClientSteadyIncrease(b, 10) 351} 352 353func Benchmark_Client_SteadyIncrease20(b *testing.B) { 354 benchmarkClientSteadyIncrease(b, 20) 355} 356 357func Benchmark_Client_SteadyIncrease25(b *testing.B) { 358 benchmarkClientSteadyIncrease(b, 25) 359} 360 361func Benchmark_Client_SteadyIncrease30(b *testing.B) { 362 benchmarkClientSteadyIncrease(b, 30) 363} 364 365func Benchmark_Client_SteadyIncrease40(b *testing.B) { 366 benchmarkClientSteadyIncrease(b, 40) 367} 368 369func Benchmark_Client_SteadyIncrease50(b *testing.B) { 370 benchmarkClientSteadyIncrease(b, 50) 371} 372 373func Benchmark_Client_SteadyIncrease100(b *testing.B) { 374 benchmarkClientSteadyIncrease(b, 100) 375} 376 377func benchmarkClientSteadyIncrease(b *testing.B, incStep uint64) { 378 for n := 0; n < b.N; n++ { 379 server, client, teardown := createBenchmarkServer(incStep) 380 sp := client.idleSessions 381 if uint64(sp.idleList.Len()+sp.idleWriteList.Len()) != sp.MinOpened { 382 b.Fatalf("session count mismatch\nGot: %d\nWant: %d", sp.idleList.Len()+sp.idleWriteList.Len(), sp.MinOpened) 383 } 384 385 transactions := make([]*ReadOnlyTransaction, sp.MaxOpened) 386 for i := uint64(0); i < sp.MaxOpened; i++ { 387 transactions[i] = client.ReadOnlyTransaction() 388 transactions[i].Query(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums)) 389 } 390 for i := uint64(0); i < sp.MaxOpened; i++ { 391 transactions[i].Close() 392 } 393 reportBenchmark(b, sp, server) 394 teardown() 395 } 396} 397 398func reportBenchmark(b *testing.B, sp *sessionPool, server *MockedSpannerInMemTestServer) { 399 sp.mu.Lock() 400 defer sp.mu.Unlock() 401 requests := drainRequestsFromServer(server.TestSpanner) 402 // TODO(loite): Use b.ReportMetric when Go1.13 is the minimum required. 403 b.Logf("BatchCreateSessions: %d\t", countRequests(requests, reflect.TypeOf(&sppb.BatchCreateSessionsRequest{}))) 404 b.Logf("CreateSession: %d\t", countRequests(requests, reflect.TypeOf(&sppb.CreateSessionRequest{}))) 405 b.Logf("BeginTransaction: %d\t", countRequests(requests, reflect.TypeOf(&sppb.BeginTransactionRequest{}))) 406 b.Logf("Commit: %d\t", countRequests(requests, reflect.TypeOf(&sppb.CommitRequest{}))) 407 b.Logf("ReadSessions: %d\t", sp.idleList.Len()) 408 b.Logf("WriteSessions: %d\n", sp.idleWriteList.Len()) 409} 410 411func countRequests(requests []interface{}, tp reflect.Type) (count int) { 412 for _, req := range requests { 413 if tp == reflect.TypeOf(req) { 414 count++ 415 } 416 } 417 return count 418} 419