1// Copyright (c) 2014, Suryandaru Triandana <syndtr@gmail.com> 2// All rights reserved. 3// 4// Use of this source code is governed by a BSD-style license that can be 5// found in the LICENSE file. 6 7package testutil 8 9import ( 10 "bytes" 11 "fmt" 12 "io" 13 "math/rand" 14 "os" 15 "path/filepath" 16 "runtime" 17 "strings" 18 "sync" 19 20 . "github.com/onsi/gomega" 21 22 "github.com/syndtr/goleveldb/leveldb/storage" 23) 24 25var ( 26 storageMu sync.Mutex 27 storageUseFS = true 28 storageKeepFS = false 29 storageNum int 30) 31 32type StorageMode int 33 34const ( 35 ModeOpen StorageMode = 1 << iota 36 ModeCreate 37 ModeRemove 38 ModeRename 39 ModeRead 40 ModeWrite 41 ModeSync 42 ModeClose 43) 44 45const ( 46 modeOpen = iota 47 modeCreate 48 modeRemove 49 modeRename 50 modeRead 51 modeWrite 52 modeSync 53 modeClose 54 55 modeCount 56) 57 58const ( 59 typeManifest = iota 60 typeJournal 61 typeTable 62 typeTemp 63 64 typeCount 65) 66 67const flattenCount = modeCount * typeCount 68 69func flattenType(m StorageMode, t storage.FileType) int { 70 var x int 71 switch m { 72 case ModeOpen: 73 x = modeOpen 74 case ModeCreate: 75 x = modeCreate 76 case ModeRemove: 77 x = modeRemove 78 case ModeRename: 79 x = modeRename 80 case ModeRead: 81 x = modeRead 82 case ModeWrite: 83 x = modeWrite 84 case ModeSync: 85 x = modeSync 86 case ModeClose: 87 x = modeClose 88 default: 89 panic("invalid storage mode") 90 } 91 x *= typeCount 92 switch t { 93 case storage.TypeManifest: 94 return x + typeManifest 95 case storage.TypeJournal: 96 return x + typeJournal 97 case storage.TypeTable: 98 return x + typeTable 99 case storage.TypeTemp: 100 return x + typeTemp 101 default: 102 panic("invalid file type") 103 } 104} 105 106func listFlattenType(m StorageMode, t storage.FileType) []int { 107 ret := make([]int, 0, flattenCount) 108 add := func(x int) { 109 x *= typeCount 110 switch { 111 case t&storage.TypeManifest != 0: 112 ret = append(ret, x+typeManifest) 113 case t&storage.TypeJournal != 0: 114 ret = append(ret, x+typeJournal) 115 case t&storage.TypeTable != 0: 116 ret = append(ret, x+typeTable) 117 case t&storage.TypeTemp != 0: 118 ret = append(ret, x+typeTemp) 119 } 120 } 121 switch { 122 case m&ModeOpen != 0: 123 add(modeOpen) 124 case m&ModeCreate != 0: 125 add(modeCreate) 126 case m&ModeRemove != 0: 127 add(modeRemove) 128 case m&ModeRename != 0: 129 add(modeRename) 130 case m&ModeRead != 0: 131 add(modeRead) 132 case m&ModeWrite != 0: 133 add(modeWrite) 134 case m&ModeSync != 0: 135 add(modeSync) 136 case m&ModeClose != 0: 137 add(modeClose) 138 } 139 return ret 140} 141 142func packFile(fd storage.FileDesc) uint64 { 143 if fd.Num>>(63-typeCount) != 0 { 144 panic("overflow") 145 } 146 return uint64(fd.Num<<typeCount) | uint64(fd.Type) 147} 148 149func unpackFile(x uint64) storage.FileDesc { 150 return storage.FileDesc{ 151 Type: storage.FileType(x) & storage.TypeAll, 152 Num: int64(x >> typeCount), 153 } 154} 155 156type emulatedError struct { 157 err error 158} 159 160func (err emulatedError) Error() string { 161 return fmt.Sprintf("emulated storage error: %v", err.err) 162} 163 164type storageLock struct { 165 s *Storage 166 l storage.Locker 167} 168 169func (l storageLock) Unlock() { 170 l.l.Unlock() 171 l.s.logI("storage lock released") 172} 173 174type reader struct { 175 s *Storage 176 fd storage.FileDesc 177 storage.Reader 178} 179 180func (r *reader) Read(p []byte) (n int, err error) { 181 err = r.s.emulateError(ModeRead, r.fd.Type) 182 if err == nil { 183 r.s.stall(ModeRead, r.fd.Type) 184 n, err = r.Reader.Read(p) 185 } 186 r.s.count(ModeRead, r.fd.Type, n) 187 if err != nil && err != io.EOF { 188 r.s.logI("read error, fd=%s n=%d err=%v", r.fd, n, err) 189 } 190 return 191} 192 193func (r *reader) ReadAt(p []byte, off int64) (n int, err error) { 194 err = r.s.emulateError(ModeRead, r.fd.Type) 195 if err == nil { 196 r.s.stall(ModeRead, r.fd.Type) 197 n, err = r.Reader.ReadAt(p, off) 198 } 199 r.s.count(ModeRead, r.fd.Type, n) 200 if err != nil && err != io.EOF { 201 r.s.logI("readAt error, fd=%s offset=%d n=%d err=%v", r.fd, off, n, err) 202 } 203 return 204} 205 206func (r *reader) Close() (err error) { 207 return r.s.fileClose(r.fd, r.Reader) 208} 209 210type writer struct { 211 s *Storage 212 fd storage.FileDesc 213 storage.Writer 214} 215 216func (w *writer) Write(p []byte) (n int, err error) { 217 err = w.s.emulateError(ModeWrite, w.fd.Type) 218 if err == nil { 219 w.s.stall(ModeWrite, w.fd.Type) 220 n, err = w.Writer.Write(p) 221 } 222 w.s.count(ModeWrite, w.fd.Type, n) 223 if err != nil && err != io.EOF { 224 w.s.logI("write error, fd=%s n=%d err=%v", w.fd, n, err) 225 } 226 return 227} 228 229func (w *writer) Sync() (err error) { 230 err = w.s.emulateError(ModeSync, w.fd.Type) 231 if err == nil { 232 w.s.stall(ModeSync, w.fd.Type) 233 err = w.Writer.Sync() 234 } 235 w.s.count(ModeSync, w.fd.Type, 0) 236 if err != nil { 237 w.s.logI("sync error, fd=%s err=%v", w.fd, err) 238 } 239 return 240} 241 242func (w *writer) Close() (err error) { 243 return w.s.fileClose(w.fd, w.Writer) 244} 245 246type Storage struct { 247 storage.Storage 248 path string 249 onClose func() (preserve bool, err error) 250 onLog func(str string) 251 252 lmu sync.Mutex 253 lb bytes.Buffer 254 255 mu sync.Mutex 256 rand *rand.Rand 257 // Open files, true=writer, false=reader 258 opens map[uint64]bool 259 counters [flattenCount]int 260 bytesCounter [flattenCount]int64 261 emulatedError [flattenCount]error 262 emulatedErrorOnce [flattenCount]bool 263 emulatedRandomError [flattenCount]error 264 emulatedRandomErrorProb [flattenCount]float64 265 stallCond sync.Cond 266 stalled [flattenCount]bool 267} 268 269func (s *Storage) log(skip int, str string) { 270 s.lmu.Lock() 271 defer s.lmu.Unlock() 272 _, file, line, ok := runtime.Caller(skip + 2) 273 if ok { 274 // Truncate file name at last file name separator. 275 if index := strings.LastIndex(file, "/"); index >= 0 { 276 file = file[index+1:] 277 } else if index = strings.LastIndex(file, "\\"); index >= 0 { 278 file = file[index+1:] 279 } 280 } else { 281 file = "???" 282 line = 1 283 } 284 fmt.Fprintf(&s.lb, "%s:%d: ", file, line) 285 lines := strings.Split(str, "\n") 286 if l := len(lines); l > 1 && lines[l-1] == "" { 287 lines = lines[:l-1] 288 } 289 for i, line := range lines { 290 if i > 0 { 291 s.lb.WriteString("\n\t") 292 } 293 s.lb.WriteString(line) 294 } 295 if s.onLog != nil { 296 s.onLog(s.lb.String()) 297 s.lb.Reset() 298 } else { 299 s.lb.WriteByte('\n') 300 } 301} 302 303func (s *Storage) logISkip(skip int, format string, args ...interface{}) { 304 pc, _, _, ok := runtime.Caller(skip + 1) 305 if ok { 306 if f := runtime.FuncForPC(pc); f != nil { 307 fname := f.Name() 308 if index := strings.LastIndex(fname, "."); index >= 0 { 309 fname = fname[index+1:] 310 } 311 format = fname + ": " + format 312 } 313 } 314 s.log(skip+1, fmt.Sprintf(format, args...)) 315} 316 317func (s *Storage) logI(format string, args ...interface{}) { 318 s.logISkip(1, format, args...) 319} 320 321func (s *Storage) OnLog(onLog func(log string)) { 322 s.lmu.Lock() 323 s.onLog = onLog 324 if s.lb.Len() != 0 { 325 log := s.lb.String() 326 s.onLog(log[:len(log)-1]) 327 s.lb.Reset() 328 } 329 s.lmu.Unlock() 330} 331 332func (s *Storage) Log(str string) { 333 s.log(1, "Log: "+str) 334 s.Storage.Log(str) 335} 336 337func (s *Storage) Lock() (l storage.Locker, err error) { 338 l, err = s.Storage.Lock() 339 if err != nil { 340 s.logI("storage locking failed, err=%v", err) 341 } else { 342 s.logI("storage locked") 343 l = storageLock{s, l} 344 } 345 return 346} 347 348func (s *Storage) List(t storage.FileType) (fds []storage.FileDesc, err error) { 349 fds, err = s.Storage.List(t) 350 if err != nil { 351 s.logI("list failed, err=%v", err) 352 return 353 } 354 s.logI("list, type=0x%x count=%d", int(t), len(fds)) 355 return 356} 357 358func (s *Storage) GetMeta() (fd storage.FileDesc, err error) { 359 fd, err = s.Storage.GetMeta() 360 if err != nil { 361 if !os.IsNotExist(err) { 362 s.logI("get meta failed, err=%v", err) 363 } 364 return 365 } 366 s.logI("get meta, fd=%s", fd) 367 return 368} 369 370func (s *Storage) SetMeta(fd storage.FileDesc) error { 371 ExpectWithOffset(1, fd.Type).To(Equal(storage.TypeManifest)) 372 err := s.Storage.SetMeta(fd) 373 if err != nil { 374 s.logI("set meta failed, fd=%s err=%v", fd, err) 375 } else { 376 s.logI("set meta, fd=%s", fd) 377 } 378 return err 379} 380 381func (s *Storage) fileClose(fd storage.FileDesc, closer io.Closer) (err error) { 382 err = s.emulateError(ModeClose, fd.Type) 383 if err == nil { 384 s.stall(ModeClose, fd.Type) 385 } 386 x := packFile(fd) 387 s.mu.Lock() 388 defer s.mu.Unlock() 389 if err == nil { 390 ExpectWithOffset(2, s.opens).To(HaveKey(x), "File closed, fd=%s", fd) 391 err = closer.Close() 392 } 393 s.countNB(ModeClose, fd.Type, 0) 394 writer := s.opens[x] 395 if err != nil { 396 s.logISkip(1, "file close failed, fd=%s writer=%v err=%v", fd, writer, err) 397 } else { 398 s.logISkip(1, "file closed, fd=%s writer=%v", fd, writer) 399 delete(s.opens, x) 400 } 401 return 402} 403 404func (s *Storage) assertOpen(fd storage.FileDesc) { 405 x := packFile(fd) 406 ExpectWithOffset(2, s.opens).NotTo(HaveKey(x), "File open, fd=%s writer=%v", fd, s.opens[x]) 407} 408 409func (s *Storage) Open(fd storage.FileDesc) (r storage.Reader, err error) { 410 err = s.emulateError(ModeOpen, fd.Type) 411 if err == nil { 412 s.stall(ModeOpen, fd.Type) 413 } 414 s.mu.Lock() 415 defer s.mu.Unlock() 416 if err == nil { 417 s.assertOpen(fd) 418 s.countNB(ModeOpen, fd.Type, 0) 419 r, err = s.Storage.Open(fd) 420 } 421 if err != nil { 422 s.logI("file open failed, fd=%s err=%v", fd, err) 423 } else { 424 s.logI("file opened, fd=%s", fd) 425 s.opens[packFile(fd)] = false 426 r = &reader{s, fd, r} 427 } 428 return 429} 430 431func (s *Storage) Create(fd storage.FileDesc) (w storage.Writer, err error) { 432 err = s.emulateError(ModeCreate, fd.Type) 433 if err == nil { 434 s.stall(ModeCreate, fd.Type) 435 } 436 s.mu.Lock() 437 defer s.mu.Unlock() 438 if err == nil { 439 s.assertOpen(fd) 440 s.countNB(ModeCreate, fd.Type, 0) 441 w, err = s.Storage.Create(fd) 442 } 443 if err != nil { 444 s.logI("file create failed, fd=%s err=%v", fd, err) 445 } else { 446 s.logI("file created, fd=%s", fd) 447 s.opens[packFile(fd)] = true 448 w = &writer{s, fd, w} 449 } 450 return 451} 452 453func (s *Storage) Remove(fd storage.FileDesc) (err error) { 454 err = s.emulateError(ModeRemove, fd.Type) 455 if err == nil { 456 s.stall(ModeRemove, fd.Type) 457 } 458 s.mu.Lock() 459 defer s.mu.Unlock() 460 if err == nil { 461 s.assertOpen(fd) 462 s.countNB(ModeRemove, fd.Type, 0) 463 err = s.Storage.Remove(fd) 464 } 465 if err != nil { 466 s.logI("file remove failed, fd=%s err=%v", fd, err) 467 } else { 468 s.logI("file removed, fd=%s", fd) 469 } 470 return 471} 472 473func (s *Storage) ForceRemove(fd storage.FileDesc) (err error) { 474 s.countNB(ModeRemove, fd.Type, 0) 475 if err = s.Storage.Remove(fd); err != nil { 476 s.logI("file remove failed (forced), fd=%s err=%v", fd, err) 477 } else { 478 s.logI("file removed (forced), fd=%s", fd) 479 } 480 return 481} 482 483func (s *Storage) Rename(oldfd, newfd storage.FileDesc) (err error) { 484 err = s.emulateError(ModeRename, oldfd.Type) 485 if err == nil { 486 s.stall(ModeRename, oldfd.Type) 487 } 488 s.mu.Lock() 489 defer s.mu.Unlock() 490 if err == nil { 491 s.assertOpen(oldfd) 492 s.assertOpen(newfd) 493 s.countNB(ModeRename, oldfd.Type, 0) 494 err = s.Storage.Rename(oldfd, newfd) 495 } 496 if err != nil { 497 s.logI("file rename failed, oldfd=%s newfd=%s err=%v", oldfd, newfd, err) 498 } else { 499 s.logI("file renamed, oldfd=%s newfd=%s", oldfd, newfd) 500 } 501 return 502} 503 504func (s *Storage) ForceRename(oldfd, newfd storage.FileDesc) (err error) { 505 s.countNB(ModeRename, oldfd.Type, 0) 506 if err = s.Storage.Rename(oldfd, newfd); err != nil { 507 s.logI("file rename failed (forced), oldfd=%s newfd=%s err=%v", oldfd, newfd, err) 508 } else { 509 s.logI("file renamed (forced), oldfd=%s newfd=%s", oldfd, newfd) 510 } 511 return 512} 513 514func (s *Storage) openFiles() string { 515 out := "Open files:" 516 for x, writer := range s.opens { 517 fd := unpackFile(x) 518 out += fmt.Sprintf("\n · fd=%s writer=%v", fd, writer) 519 } 520 return out 521} 522 523func (s *Storage) CloseCheck() { 524 s.mu.Lock() 525 defer s.mu.Unlock() 526 ExpectWithOffset(1, s.opens).To(BeEmpty(), s.openFiles()) 527} 528 529func (s *Storage) OnClose(onClose func() (preserve bool, err error)) { 530 s.mu.Lock() 531 s.onClose = onClose 532 s.mu.Unlock() 533} 534 535func (s *Storage) Close() error { 536 s.mu.Lock() 537 defer s.mu.Unlock() 538 ExpectWithOffset(1, s.opens).To(BeEmpty(), s.openFiles()) 539 err := s.Storage.Close() 540 if err != nil { 541 s.logI("storage closing failed, err=%v", err) 542 } else { 543 s.logI("storage closed") 544 } 545 var preserve bool 546 if s.onClose != nil { 547 var err0 error 548 if preserve, err0 = s.onClose(); err0 != nil { 549 s.logI("onClose error, err=%v", err0) 550 } 551 } 552 if s.path != "" { 553 if storageKeepFS || preserve { 554 s.logI("storage is preserved, path=%v", s.path) 555 } else { 556 if err1 := os.RemoveAll(s.path); err1 != nil { 557 s.logI("cannot remove storage, err=%v", err1) 558 } else { 559 s.logI("storage has been removed") 560 } 561 } 562 } 563 return err 564} 565 566func (s *Storage) countNB(m StorageMode, t storage.FileType, n int) { 567 s.counters[flattenType(m, t)]++ 568 s.bytesCounter[flattenType(m, t)] += int64(n) 569} 570 571func (s *Storage) count(m StorageMode, t storage.FileType, n int) { 572 s.mu.Lock() 573 defer s.mu.Unlock() 574 s.countNB(m, t, n) 575} 576 577func (s *Storage) ResetCounter(m StorageMode, t storage.FileType) { 578 for _, x := range listFlattenType(m, t) { 579 s.counters[x] = 0 580 s.bytesCounter[x] = 0 581 } 582} 583 584func (s *Storage) Counter(m StorageMode, t storage.FileType) (count int, bytes int64) { 585 for _, x := range listFlattenType(m, t) { 586 count += s.counters[x] 587 bytes += s.bytesCounter[x] 588 } 589 return 590} 591 592func (s *Storage) emulateError(m StorageMode, t storage.FileType) error { 593 s.mu.Lock() 594 defer s.mu.Unlock() 595 x := flattenType(m, t) 596 if err := s.emulatedError[x]; err != nil { 597 if s.emulatedErrorOnce[x] { 598 s.emulatedError[x] = nil 599 } 600 return emulatedError{err} 601 } 602 if err := s.emulatedRandomError[x]; err != nil && s.rand.Float64() < s.emulatedRandomErrorProb[x] { 603 return emulatedError{err} 604 } 605 return nil 606} 607 608func (s *Storage) EmulateError(m StorageMode, t storage.FileType, err error) { 609 s.mu.Lock() 610 defer s.mu.Unlock() 611 for _, x := range listFlattenType(m, t) { 612 s.emulatedError[x] = err 613 s.emulatedErrorOnce[x] = false 614 } 615} 616 617func (s *Storage) EmulateErrorOnce(m StorageMode, t storage.FileType, err error) { 618 s.mu.Lock() 619 defer s.mu.Unlock() 620 for _, x := range listFlattenType(m, t) { 621 s.emulatedError[x] = err 622 s.emulatedErrorOnce[x] = true 623 } 624} 625 626func (s *Storage) EmulateRandomError(m StorageMode, t storage.FileType, prob float64, err error) { 627 s.mu.Lock() 628 defer s.mu.Unlock() 629 for _, x := range listFlattenType(m, t) { 630 s.emulatedRandomError[x] = err 631 s.emulatedRandomErrorProb[x] = prob 632 } 633} 634 635func (s *Storage) stall(m StorageMode, t storage.FileType) { 636 x := flattenType(m, t) 637 s.mu.Lock() 638 defer s.mu.Unlock() 639 for s.stalled[x] { 640 s.stallCond.Wait() 641 } 642} 643 644func (s *Storage) Stall(m StorageMode, t storage.FileType) { 645 s.mu.Lock() 646 defer s.mu.Unlock() 647 for _, x := range listFlattenType(m, t) { 648 s.stalled[x] = true 649 } 650} 651 652func (s *Storage) Release(m StorageMode, t storage.FileType) { 653 s.mu.Lock() 654 defer s.mu.Unlock() 655 for _, x := range listFlattenType(m, t) { 656 s.stalled[x] = false 657 } 658 s.stallCond.Broadcast() 659} 660 661func NewStorage() *Storage { 662 var ( 663 stor storage.Storage 664 path string 665 ) 666 if storageUseFS { 667 for { 668 storageMu.Lock() 669 num := storageNum 670 storageNum++ 671 storageMu.Unlock() 672 path = filepath.Join(os.TempDir(), fmt.Sprintf("goleveldb-test%d0%d0%d", os.Getuid(), os.Getpid(), num)) 673 if _, err := os.Stat(path); os.IsNotExist(err) { 674 stor, err = storage.OpenFile(path, false) 675 ExpectWithOffset(1, err).NotTo(HaveOccurred(), "creating storage at %s", path) 676 break 677 } 678 } 679 } else { 680 stor = storage.NewMemStorage() 681 } 682 s := &Storage{ 683 Storage: stor, 684 path: path, 685 rand: NewRand(), 686 opens: make(map[uint64]bool), 687 } 688 s.stallCond.L = &s.mu 689 if s.path != "" { 690 s.logI("using FS storage") 691 s.logI("storage path: %s", s.path) 692 } else { 693 s.logI("using MEM storage") 694 } 695 return s 696} 697