1package main 2 3import ( 4 "crypto/rand" 5 "encoding/binary" 6 "flag" 7 "fmt" 8 "log" 9 mrand "math/rand" 10 "net/http" 11 _ "net/http/pprof" 12 "os" 13 "os/signal" 14 "path" 15 "runtime" 16 "strconv" 17 "strings" 18 "sync" 19 "sync/atomic" 20 "time" 21 22 "github.com/syndtr/goleveldb/leveldb" 23 "github.com/syndtr/goleveldb/leveldb/errors" 24 "github.com/syndtr/goleveldb/leveldb/opt" 25 "github.com/syndtr/goleveldb/leveldb/storage" 26 "github.com/syndtr/goleveldb/leveldb/table" 27 "github.com/syndtr/goleveldb/leveldb/util" 28) 29 30var ( 31 dbPath = path.Join(os.TempDir(), "goleveldb-testdb") 32 openFilesCacheCapacity = 500 33 keyLen = 63 34 valueLen = 256 35 numKeys = arrayInt{100000, 1332, 531, 1234, 9553, 1024, 35743} 36 httpProf = "127.0.0.1:5454" 37 transactionProb = 0.5 38 enableBlockCache = false 39 enableCompression = false 40 enableBufferPool = false 41 42 wg = new(sync.WaitGroup) 43 done, fail uint32 44 45 bpool *util.BufferPool 46) 47 48type arrayInt []int 49 50func (a arrayInt) String() string { 51 var str string 52 for i, n := range a { 53 if i > 0 { 54 str += "," 55 } 56 str += strconv.Itoa(n) 57 } 58 return str 59} 60 61func (a *arrayInt) Set(str string) error { 62 var na arrayInt 63 for _, s := range strings.Split(str, ",") { 64 s = strings.TrimSpace(s) 65 if s != "" { 66 n, err := strconv.Atoi(s) 67 if err != nil { 68 return err 69 } 70 na = append(na, n) 71 } 72 } 73 *a = na 74 return nil 75} 76 77func init() { 78 flag.StringVar(&dbPath, "db", dbPath, "testdb path") 79 flag.IntVar(&openFilesCacheCapacity, "openfilescachecap", openFilesCacheCapacity, "open files cache capacity") 80 flag.IntVar(&keyLen, "keylen", keyLen, "key length") 81 flag.IntVar(&valueLen, "valuelen", valueLen, "value length") 82 flag.Var(&numKeys, "numkeys", "num keys") 83 flag.StringVar(&httpProf, "httpprof", httpProf, "http pprof listen addr") 84 flag.Float64Var(&transactionProb, "transactionprob", transactionProb, "probablity of writes using transaction") 85 flag.BoolVar(&enableBufferPool, "enablebufferpool", enableBufferPool, "enable buffer pool") 86 flag.BoolVar(&enableBlockCache, "enableblockcache", enableBlockCache, "enable block cache") 87 flag.BoolVar(&enableCompression, "enablecompression", enableCompression, "enable block compression") 88} 89 90func randomData(dst []byte, ns, prefix byte, i uint32, dataLen int) []byte { 91 if dataLen < (2+4+4)*2+4 { 92 panic("dataLen is too small") 93 } 94 if cap(dst) < dataLen { 95 dst = make([]byte, dataLen) 96 } else { 97 dst = dst[:dataLen] 98 } 99 half := (dataLen - 4) / 2 100 if _, err := rand.Reader.Read(dst[2 : half-8]); err != nil { 101 panic(err) 102 } 103 dst[0] = ns 104 dst[1] = prefix 105 binary.LittleEndian.PutUint32(dst[half-8:], i) 106 binary.LittleEndian.PutUint32(dst[half-8:], i) 107 binary.LittleEndian.PutUint32(dst[half-4:], util.NewCRC(dst[:half-4]).Value()) 108 full := half * 2 109 copy(dst[half:full], dst[:half]) 110 if full < dataLen-4 { 111 if _, err := rand.Reader.Read(dst[full : dataLen-4]); err != nil { 112 panic(err) 113 } 114 } 115 binary.LittleEndian.PutUint32(dst[dataLen-4:], util.NewCRC(dst[:dataLen-4]).Value()) 116 return dst 117} 118 119func dataSplit(data []byte) (data0, data1 []byte) { 120 n := (len(data) - 4) / 2 121 return data[:n], data[n : n+n] 122} 123 124func dataNS(data []byte) byte { 125 return data[0] 126} 127 128func dataPrefix(data []byte) byte { 129 return data[1] 130} 131 132func dataI(data []byte) uint32 { 133 return binary.LittleEndian.Uint32(data[(len(data)-4)/2-8:]) 134} 135 136func dataChecksum(data []byte) (uint32, uint32) { 137 checksum0 := binary.LittleEndian.Uint32(data[len(data)-4:]) 138 checksum1 := util.NewCRC(data[:len(data)-4]).Value() 139 return checksum0, checksum1 140} 141 142func dataPrefixSlice(ns, prefix byte) *util.Range { 143 return util.BytesPrefix([]byte{ns, prefix}) 144} 145 146func dataNsSlice(ns byte) *util.Range { 147 return util.BytesPrefix([]byte{ns}) 148} 149 150type testingStorage struct { 151 storage.Storage 152} 153 154func (ts *testingStorage) scanTable(fd storage.FileDesc, checksum bool) (corrupted bool) { 155 r, err := ts.Open(fd) 156 if err != nil { 157 log.Fatal(err) 158 } 159 defer r.Close() 160 161 size, err := r.Seek(0, os.SEEK_END) 162 if err != nil { 163 log.Fatal(err) 164 } 165 166 o := &opt.Options{ 167 DisableLargeBatchTransaction: true, 168 Strict: opt.NoStrict, 169 } 170 if checksum { 171 o.Strict = opt.StrictBlockChecksum | opt.StrictReader 172 } 173 tr, err := table.NewReader(r, size, fd, nil, bpool, o) 174 if err != nil { 175 log.Fatal(err) 176 } 177 defer tr.Release() 178 179 checkData := func(i int, t string, data []byte) bool { 180 if len(data) == 0 { 181 panic(fmt.Sprintf("[%v] nil data: i=%d t=%s", fd, i, t)) 182 } 183 184 checksum0, checksum1 := dataChecksum(data) 185 if checksum0 != checksum1 { 186 atomic.StoreUint32(&fail, 1) 187 atomic.StoreUint32(&done, 1) 188 corrupted = true 189 190 data0, data1 := dataSplit(data) 191 data0c0, data0c1 := dataChecksum(data0) 192 data1c0, data1c1 := dataChecksum(data1) 193 log.Printf("FATAL: [%v] Corrupted data i=%d t=%s (%#x != %#x): %x(%v) vs %x(%v)", 194 fd, i, t, checksum0, checksum1, data0, data0c0 == data0c1, data1, data1c0 == data1c1) 195 return true 196 } 197 return false 198 } 199 200 iter := tr.NewIterator(nil, nil) 201 defer iter.Release() 202 for i := 0; iter.Next(); i++ { 203 ukey, _, kt, kerr := parseIkey(iter.Key()) 204 if kerr != nil { 205 atomic.StoreUint32(&fail, 1) 206 atomic.StoreUint32(&done, 1) 207 corrupted = true 208 209 log.Printf("FATAL: [%v] Corrupted ikey i=%d: %v", fd, i, kerr) 210 return 211 } 212 if checkData(i, "key", ukey) { 213 return 214 } 215 if kt == ktVal && checkData(i, "value", iter.Value()) { 216 return 217 } 218 } 219 if err := iter.Error(); err != nil { 220 if errors.IsCorrupted(err) { 221 atomic.StoreUint32(&fail, 1) 222 atomic.StoreUint32(&done, 1) 223 corrupted = true 224 225 log.Printf("FATAL: [%v] Corruption detected: %v", fd, err) 226 } else { 227 log.Fatal(err) 228 } 229 } 230 231 return 232} 233 234func (ts *testingStorage) Remove(fd storage.FileDesc) error { 235 if atomic.LoadUint32(&fail) == 1 { 236 return nil 237 } 238 239 if fd.Type == storage.TypeTable { 240 if ts.scanTable(fd, true) { 241 return nil 242 } 243 } 244 return ts.Storage.Remove(fd) 245} 246 247type latencyStats struct { 248 mark time.Time 249 dur, min, max time.Duration 250 num int 251} 252 253func (s *latencyStats) start() { 254 s.mark = time.Now() 255} 256 257func (s *latencyStats) record(n int) { 258 if s.mark.IsZero() { 259 panic("not started") 260 } 261 dur := time.Now().Sub(s.mark) 262 dur1 := dur / time.Duration(n) 263 if dur1 < s.min || s.min == 0 { 264 s.min = dur1 265 } 266 if dur1 > s.max { 267 s.max = dur1 268 } 269 s.dur += dur 270 s.num += n 271 s.mark = time.Time{} 272} 273 274func (s *latencyStats) ratePerSec() int { 275 durSec := s.dur / time.Second 276 if durSec > 0 { 277 return s.num / int(durSec) 278 } 279 return s.num 280} 281 282func (s *latencyStats) avg() time.Duration { 283 if s.num > 0 { 284 return s.dur / time.Duration(s.num) 285 } 286 return 0 287} 288 289func (s *latencyStats) add(x *latencyStats) { 290 if x.min < s.min || s.min == 0 { 291 s.min = x.min 292 } 293 if x.max > s.max { 294 s.max = x.max 295 } 296 s.dur += x.dur 297 s.num += x.num 298} 299 300func main() { 301 flag.Parse() 302 303 if enableBufferPool { 304 bpool = util.NewBufferPool(opt.DefaultBlockSize + 128) 305 } 306 307 log.Printf("Test DB stored at %q", dbPath) 308 if httpProf != "" { 309 log.Printf("HTTP pprof listening at %q", httpProf) 310 runtime.SetBlockProfileRate(1) 311 go func() { 312 if err := http.ListenAndServe(httpProf, nil); err != nil { 313 log.Fatalf("HTTPPROF: %v", err) 314 } 315 }() 316 } 317 318 runtime.GOMAXPROCS(runtime.NumCPU()) 319 320 os.RemoveAll(dbPath) 321 stor, err := storage.OpenFile(dbPath, false) 322 if err != nil { 323 log.Fatal(err) 324 } 325 tstor := &testingStorage{stor} 326 defer tstor.Close() 327 328 fatalf := func(err error, format string, v ...interface{}) { 329 atomic.StoreUint32(&fail, 1) 330 atomic.StoreUint32(&done, 1) 331 log.Printf("FATAL: "+format, v...) 332 if err != nil && errors.IsCorrupted(err) { 333 cerr := err.(*errors.ErrCorrupted) 334 if !cerr.Fd.Zero() && cerr.Fd.Type == storage.TypeTable { 335 log.Print("FATAL: corruption detected, scanning...") 336 if !tstor.scanTable(storage.FileDesc{Type: storage.TypeTable, Num: cerr.Fd.Num}, false) { 337 log.Printf("FATAL: unable to find corrupted key/value pair in table %v", cerr.Fd) 338 } 339 } 340 } 341 runtime.Goexit() 342 } 343 344 if openFilesCacheCapacity == 0 { 345 openFilesCacheCapacity = -1 346 } 347 o := &opt.Options{ 348 OpenFilesCacheCapacity: openFilesCacheCapacity, 349 DisableBufferPool: !enableBufferPool, 350 DisableBlockCache: !enableBlockCache, 351 ErrorIfExist: true, 352 Compression: opt.NoCompression, 353 } 354 if enableCompression { 355 o.Compression = opt.DefaultCompression 356 } 357 358 db, err := leveldb.Open(tstor, o) 359 if err != nil { 360 log.Fatal(err) 361 } 362 defer db.Close() 363 364 var ( 365 mu = &sync.Mutex{} 366 gGetStat = &latencyStats{} 367 gIterStat = &latencyStats{} 368 gWriteStat = &latencyStats{} 369 gTrasactionStat = &latencyStats{} 370 startTime = time.Now() 371 372 writeReq = make(chan *leveldb.Batch) 373 writeAck = make(chan error) 374 writeAckAck = make(chan struct{}) 375 ) 376 377 go func() { 378 for b := range writeReq { 379 380 var err error 381 if mrand.Float64() < transactionProb { 382 log.Print("> Write using transaction") 383 gTrasactionStat.start() 384 var tr *leveldb.Transaction 385 if tr, err = db.OpenTransaction(); err == nil { 386 if err = tr.Write(b, nil); err == nil { 387 if err = tr.Commit(); err == nil { 388 gTrasactionStat.record(b.Len()) 389 } 390 } else { 391 tr.Discard() 392 } 393 } 394 } else { 395 gWriteStat.start() 396 if err = db.Write(b, nil); err == nil { 397 gWriteStat.record(b.Len()) 398 } 399 } 400 writeAck <- err 401 <-writeAckAck 402 } 403 }() 404 405 go func() { 406 for { 407 time.Sleep(3 * time.Second) 408 409 log.Print("------------------------") 410 411 log.Printf("> Elapsed=%v", time.Now().Sub(startTime)) 412 mu.Lock() 413 log.Printf("> GetLatencyMin=%v GetLatencyMax=%v GetLatencyAvg=%v GetRatePerSec=%d", 414 gGetStat.min, gGetStat.max, gGetStat.avg(), gGetStat.ratePerSec()) 415 log.Printf("> IterLatencyMin=%v IterLatencyMax=%v IterLatencyAvg=%v IterRatePerSec=%d", 416 gIterStat.min, gIterStat.max, gIterStat.avg(), gIterStat.ratePerSec()) 417 log.Printf("> WriteLatencyMin=%v WriteLatencyMax=%v WriteLatencyAvg=%v WriteRatePerSec=%d", 418 gWriteStat.min, gWriteStat.max, gWriteStat.avg(), gWriteStat.ratePerSec()) 419 log.Printf("> TransactionLatencyMin=%v TransactionLatencyMax=%v TransactionLatencyAvg=%v TransactionRatePerSec=%d", 420 gTrasactionStat.min, gTrasactionStat.max, gTrasactionStat.avg(), gTrasactionStat.ratePerSec()) 421 mu.Unlock() 422 423 cachedblock, _ := db.GetProperty("leveldb.cachedblock") 424 openedtables, _ := db.GetProperty("leveldb.openedtables") 425 alivesnaps, _ := db.GetProperty("leveldb.alivesnaps") 426 aliveiters, _ := db.GetProperty("leveldb.aliveiters") 427 blockpool, _ := db.GetProperty("leveldb.blockpool") 428 writeDelay, _ := db.GetProperty("leveldb.writedelay") 429 ioStats, _ := db.GetProperty("leveldb.iostats") 430 log.Printf("> BlockCache=%s OpenedTables=%s AliveSnaps=%s AliveIter=%s BlockPool=%q WriteDelay=%q IOStats=%q", 431 cachedblock, openedtables, alivesnaps, aliveiters, blockpool, writeDelay, ioStats) 432 log.Print("------------------------") 433 } 434 }() 435 436 for ns, numKey := range numKeys { 437 func(ns, numKey int) { 438 log.Printf("[%02d] STARTING: numKey=%d", ns, numKey) 439 440 keys := make([][]byte, numKey) 441 for i := range keys { 442 keys[i] = randomData(nil, byte(ns), 1, uint32(i), keyLen) 443 } 444 445 wg.Add(1) 446 go func() { 447 var wi uint32 448 defer func() { 449 log.Printf("[%02d] WRITER DONE #%d", ns, wi) 450 wg.Done() 451 }() 452 453 var ( 454 b = new(leveldb.Batch) 455 k2, v2 []byte 456 nReader int32 457 ) 458 for atomic.LoadUint32(&done) == 0 { 459 log.Printf("[%02d] WRITER #%d", ns, wi) 460 461 b.Reset() 462 for _, k1 := range keys { 463 k2 = randomData(k2, byte(ns), 2, wi, keyLen) 464 v2 = randomData(v2, byte(ns), 3, wi, valueLen) 465 b.Put(k2, v2) 466 b.Put(k1, k2) 467 } 468 writeReq <- b 469 if err := <-writeAck; err != nil { 470 writeAckAck <- struct{}{} 471 fatalf(err, "[%02d] WRITER #%d db.Write: %v", ns, wi, err) 472 } 473 474 snap, err := db.GetSnapshot() 475 if err != nil { 476 writeAckAck <- struct{}{} 477 fatalf(err, "[%02d] WRITER #%d db.GetSnapshot: %v", ns, wi, err) 478 } 479 480 writeAckAck <- struct{}{} 481 482 wg.Add(1) 483 atomic.AddInt32(&nReader, 1) 484 go func(snapwi uint32, snap *leveldb.Snapshot) { 485 var ( 486 ri int 487 iterStat = &latencyStats{} 488 getStat = &latencyStats{} 489 ) 490 defer func() { 491 mu.Lock() 492 gGetStat.add(getStat) 493 gIterStat.add(iterStat) 494 mu.Unlock() 495 496 atomic.AddInt32(&nReader, -1) 497 log.Printf("[%02d] READER #%d.%d DONE Snap=%v Alive=%d IterLatency=%v GetLatency=%v", ns, snapwi, ri, snap, atomic.LoadInt32(&nReader), iterStat.avg(), getStat.avg()) 498 snap.Release() 499 wg.Done() 500 }() 501 502 stopi := snapwi + 3 503 for (ri < 3 || atomic.LoadUint32(&wi) < stopi) && atomic.LoadUint32(&done) == 0 { 504 var n int 505 iter := snap.NewIterator(dataPrefixSlice(byte(ns), 1), nil) 506 iterStat.start() 507 for iter.Next() { 508 k1 := iter.Key() 509 k2 := iter.Value() 510 iterStat.record(1) 511 512 if dataNS(k2) != byte(ns) { 513 fatalf(nil, "[%02d] READER #%d.%d K%d invalid in-key NS: want=%d got=%d", ns, snapwi, ri, n, ns, dataNS(k2)) 514 } 515 516 kwritei := dataI(k2) 517 if kwritei != snapwi { 518 fatalf(nil, "[%02d] READER #%d.%d K%d invalid in-key iter num: %d", ns, snapwi, ri, n, kwritei) 519 } 520 521 getStat.start() 522 v2, err := snap.Get(k2, nil) 523 if err != nil { 524 fatalf(err, "[%02d] READER #%d.%d K%d snap.Get: %v\nk1: %x\n -> k2: %x", ns, snapwi, ri, n, err, k1, k2) 525 } 526 getStat.record(1) 527 528 if checksum0, checksum1 := dataChecksum(v2); checksum0 != checksum1 { 529 err := &errors.ErrCorrupted{Fd: storage.FileDesc{Type: 0xff, Num: 0}, Err: fmt.Errorf("v2: %x: checksum mismatch: %v vs %v", v2, checksum0, checksum1)} 530 fatalf(err, "[%02d] READER #%d.%d K%d snap.Get: %v\nk1: %x\n -> k2: %x", ns, snapwi, ri, n, err, k1, k2) 531 } 532 533 n++ 534 iterStat.start() 535 } 536 iter.Release() 537 if err := iter.Error(); err != nil { 538 fatalf(err, "[%02d] READER #%d.%d K%d iter.Error: %v", ns, snapwi, ri, numKey, err) 539 } 540 if n != numKey { 541 fatalf(nil, "[%02d] READER #%d.%d missing keys: want=%d got=%d", ns, snapwi, ri, numKey, n) 542 } 543 544 ri++ 545 } 546 }(wi, snap) 547 548 atomic.AddUint32(&wi, 1) 549 } 550 }() 551 552 delB := new(leveldb.Batch) 553 wg.Add(1) 554 go func() { 555 var ( 556 i int 557 iterStat = &latencyStats{} 558 ) 559 defer func() { 560 log.Printf("[%02d] SCANNER DONE #%d", ns, i) 561 wg.Done() 562 }() 563 564 time.Sleep(2 * time.Second) 565 566 for atomic.LoadUint32(&done) == 0 { 567 var n int 568 delB.Reset() 569 iter := db.NewIterator(dataNsSlice(byte(ns)), nil) 570 iterStat.start() 571 for iter.Next() && atomic.LoadUint32(&done) == 0 { 572 k := iter.Key() 573 v := iter.Value() 574 iterStat.record(1) 575 576 for ci, x := range [...][]byte{k, v} { 577 checksum0, checksum1 := dataChecksum(x) 578 if checksum0 != checksum1 { 579 if ci == 0 { 580 fatalf(nil, "[%02d] SCANNER %d.%d invalid key checksum: want %d, got %d\n%x -> %x", ns, i, n, checksum0, checksum1, k, v) 581 } else { 582 fatalf(nil, "[%02d] SCANNER %d.%d invalid value checksum: want %d, got %d\n%x -> %x", ns, i, n, checksum0, checksum1, k, v) 583 } 584 } 585 } 586 587 if dataPrefix(k) == 2 || mrand.Int()%999 == 0 { 588 delB.Delete(k) 589 } 590 591 n++ 592 iterStat.start() 593 } 594 iter.Release() 595 if err := iter.Error(); err != nil { 596 fatalf(err, "[%02d] SCANNER #%d.%d iter.Error: %v", ns, i, n, err) 597 } 598 599 if n > 0 { 600 log.Printf("[%02d] SCANNER #%d IterLatency=%v", ns, i, iterStat.avg()) 601 } 602 603 if delB.Len() > 0 && atomic.LoadUint32(&done) == 0 { 604 t := time.Now() 605 writeReq <- delB 606 if err := <-writeAck; err != nil { 607 writeAckAck <- struct{}{} 608 fatalf(err, "[%02d] SCANNER #%d db.Write: %v", ns, i, err) 609 } else { 610 writeAckAck <- struct{}{} 611 } 612 log.Printf("[%02d] SCANNER #%d Deleted=%d Time=%v", ns, i, delB.Len(), time.Now().Sub(t)) 613 } 614 615 i++ 616 } 617 }() 618 }(ns, numKey) 619 } 620 621 go func() { 622 sig := make(chan os.Signal) 623 signal.Notify(sig, os.Interrupt, os.Kill) 624 log.Printf("Got signal: %v, exiting...", <-sig) 625 atomic.StoreUint32(&done, 1) 626 }() 627 628 wg.Wait() 629} 630