1package bbolt 2 3import ( 4 "fmt" 5 "io" 6 "os" 7 "sort" 8 "strings" 9 "time" 10 "unsafe" 11) 12 13// txid represents the internal transaction identifier. 14type txid uint64 15 16// Tx represents a read-only or read/write transaction on the database. 17// Read-only transactions can be used for retrieving values for keys and creating cursors. 18// Read/write transactions can create and remove buckets and create and remove keys. 19// 20// IMPORTANT: You must commit or rollback transactions when you are done with 21// them. Pages can not be reclaimed by the writer until no more transactions 22// are using them. A long running read transaction can cause the database to 23// quickly grow. 24type Tx struct { 25 writable bool 26 managed bool 27 db *DB 28 meta *meta 29 root Bucket 30 pages map[pgid]*page 31 stats TxStats 32 commitHandlers []func() 33 34 // WriteFlag specifies the flag for write-related methods like WriteTo(). 35 // Tx opens the database file with the specified flag to copy the data. 36 // 37 // By default, the flag is unset, which works well for mostly in-memory 38 // workloads. For databases that are much larger than available RAM, 39 // set the flag to syscall.O_DIRECT to avoid trashing the page cache. 40 WriteFlag int 41} 42 43// init initializes the transaction. 44func (tx *Tx) init(db *DB) { 45 tx.db = db 46 tx.pages = nil 47 48 // Copy the meta page since it can be changed by the writer. 49 tx.meta = &meta{} 50 db.meta().copy(tx.meta) 51 52 // Copy over the root bucket. 53 tx.root = newBucket(tx) 54 tx.root.bucket = &bucket{} 55 *tx.root.bucket = tx.meta.root 56 57 // Increment the transaction id and add a page cache for writable transactions. 58 if tx.writable { 59 tx.pages = make(map[pgid]*page) 60 tx.meta.txid += txid(1) 61 } 62} 63 64// ID returns the transaction id. 65func (tx *Tx) ID() int { 66 return int(tx.meta.txid) 67} 68 69// DB returns a reference to the database that created the transaction. 70func (tx *Tx) DB() *DB { 71 return tx.db 72} 73 74// Size returns current database size in bytes as seen by this transaction. 75func (tx *Tx) Size() int64 { 76 return int64(tx.meta.pgid) * int64(tx.db.pageSize) 77} 78 79// Writable returns whether the transaction can perform write operations. 80func (tx *Tx) Writable() bool { 81 return tx.writable 82} 83 84// Cursor creates a cursor associated with the root bucket. 85// All items in the cursor will return a nil value because all root bucket keys point to buckets. 86// The cursor is only valid as long as the transaction is open. 87// Do not use a cursor after the transaction is closed. 88func (tx *Tx) Cursor() *Cursor { 89 return tx.root.Cursor() 90} 91 92// Stats retrieves a copy of the current transaction statistics. 93func (tx *Tx) Stats() TxStats { 94 return tx.stats 95} 96 97// Bucket retrieves a bucket by name. 98// Returns nil if the bucket does not exist. 99// The bucket instance is only valid for the lifetime of the transaction. 100func (tx *Tx) Bucket(name []byte) *Bucket { 101 return tx.root.Bucket(name) 102} 103 104// CreateBucket creates a new bucket. 105// Returns an error if the bucket already exists, if the bucket name is blank, or if the bucket name is too long. 106// The bucket instance is only valid for the lifetime of the transaction. 107func (tx *Tx) CreateBucket(name []byte) (*Bucket, error) { 108 return tx.root.CreateBucket(name) 109} 110 111// CreateBucketIfNotExists creates a new bucket if it doesn't already exist. 112// Returns an error if the bucket name is blank, or if the bucket name is too long. 113// The bucket instance is only valid for the lifetime of the transaction. 114func (tx *Tx) CreateBucketIfNotExists(name []byte) (*Bucket, error) { 115 return tx.root.CreateBucketIfNotExists(name) 116} 117 118// DeleteBucket deletes a bucket. 119// Returns an error if the bucket cannot be found or if the key represents a non-bucket value. 120func (tx *Tx) DeleteBucket(name []byte) error { 121 return tx.root.DeleteBucket(name) 122} 123 124// ForEach executes a function for each bucket in the root. 125// If the provided function returns an error then the iteration is stopped and 126// the error is returned to the caller. 127func (tx *Tx) ForEach(fn func(name []byte, b *Bucket) error) error { 128 return tx.root.ForEach(func(k, v []byte) error { 129 return fn(k, tx.root.Bucket(k)) 130 }) 131} 132 133// OnCommit adds a handler function to be executed after the transaction successfully commits. 134func (tx *Tx) OnCommit(fn func()) { 135 tx.commitHandlers = append(tx.commitHandlers, fn) 136} 137 138// Commit writes all changes to disk and updates the meta page. 139// Returns an error if a disk write error occurs, or if Commit is 140// called on a read-only transaction. 141func (tx *Tx) Commit() error { 142 _assert(!tx.managed, "managed tx commit not allowed") 143 if tx.db == nil { 144 return ErrTxClosed 145 } else if !tx.writable { 146 return ErrTxNotWritable 147 } 148 149 // TODO(benbjohnson): Use vectorized I/O to write out dirty pages. 150 151 // Rebalance nodes which have had deletions. 152 var startTime = time.Now() 153 tx.root.rebalance() 154 if tx.stats.Rebalance > 0 { 155 tx.stats.RebalanceTime += time.Since(startTime) 156 } 157 158 // spill data onto dirty pages. 159 startTime = time.Now() 160 if err := tx.root.spill(); err != nil { 161 tx.rollback() 162 return err 163 } 164 tx.stats.SpillTime += time.Since(startTime) 165 166 // Free the old root bucket. 167 tx.meta.root.root = tx.root.root 168 169 // Free the old freelist because commit writes out a fresh freelist. 170 if tx.meta.freelist != pgidNoFreelist { 171 tx.db.freelist.free(tx.meta.txid, tx.db.page(tx.meta.freelist)) 172 } 173 174 if !tx.db.NoFreelistSync { 175 err := tx.commitFreelist() 176 if err != nil { 177 return err 178 } 179 } else { 180 tx.meta.freelist = pgidNoFreelist 181 } 182 183 // Write dirty pages to disk. 184 startTime = time.Now() 185 if err := tx.write(); err != nil { 186 tx.rollback() 187 return err 188 } 189 190 // If strict mode is enabled then perform a consistency check. 191 // Only the first consistency error is reported in the panic. 192 if tx.db.StrictMode { 193 ch := tx.Check() 194 var errs []string 195 for { 196 err, ok := <-ch 197 if !ok { 198 break 199 } 200 errs = append(errs, err.Error()) 201 } 202 if len(errs) > 0 { 203 panic("check fail: " + strings.Join(errs, "\n")) 204 } 205 } 206 207 // Write meta to disk. 208 if err := tx.writeMeta(); err != nil { 209 tx.rollback() 210 return err 211 } 212 tx.stats.WriteTime += time.Since(startTime) 213 214 // Finalize the transaction. 215 tx.close() 216 217 // Execute commit handlers now that the locks have been removed. 218 for _, fn := range tx.commitHandlers { 219 fn() 220 } 221 222 return nil 223} 224 225func (tx *Tx) commitFreelist() error { 226 // Allocate new pages for the new free list. This will overestimate 227 // the size of the freelist but not underestimate the size (which would be bad). 228 opgid := tx.meta.pgid 229 p, err := tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1) 230 if err != nil { 231 tx.rollback() 232 return err 233 } 234 if err := tx.db.freelist.write(p); err != nil { 235 tx.rollback() 236 return err 237 } 238 tx.meta.freelist = p.id 239 // If the high water mark has moved up then attempt to grow the database. 240 if tx.meta.pgid > opgid { 241 if err := tx.db.grow(int(tx.meta.pgid+1) * tx.db.pageSize); err != nil { 242 tx.rollback() 243 return err 244 } 245 } 246 247 return nil 248} 249 250// Rollback closes the transaction and ignores all previous updates. Read-only 251// transactions must be rolled back and not committed. 252func (tx *Tx) Rollback() error { 253 _assert(!tx.managed, "managed tx rollback not allowed") 254 if tx.db == nil { 255 return ErrTxClosed 256 } 257 tx.nonPhysicalRollback() 258 return nil 259} 260 261// nonPhysicalRollback is called when user calls Rollback directly, in this case we do not need to reload the free pages from disk. 262func (tx *Tx) nonPhysicalRollback() { 263 if tx.db == nil { 264 return 265 } 266 if tx.writable { 267 tx.db.freelist.rollback(tx.meta.txid) 268 } 269 tx.close() 270} 271 272// rollback needs to reload the free pages from disk in case some system error happens like fsync error. 273func (tx *Tx) rollback() { 274 if tx.db == nil { 275 return 276 } 277 if tx.writable { 278 tx.db.freelist.rollback(tx.meta.txid) 279 if !tx.db.hasSyncedFreelist() { 280 // Reconstruct free page list by scanning the DB to get the whole free page list. 281 // Note: scaning the whole db is heavy if your db size is large in NoSyncFreeList mode. 282 tx.db.freelist.noSyncReload(tx.db.freepages()) 283 } else { 284 // Read free page list from freelist page. 285 tx.db.freelist.reload(tx.db.page(tx.db.meta().freelist)) 286 } 287 } 288 tx.close() 289} 290 291func (tx *Tx) close() { 292 if tx.db == nil { 293 return 294 } 295 if tx.writable { 296 // Grab freelist stats. 297 var freelistFreeN = tx.db.freelist.free_count() 298 var freelistPendingN = tx.db.freelist.pending_count() 299 var freelistAlloc = tx.db.freelist.size() 300 301 // Remove transaction ref & writer lock. 302 tx.db.rwtx = nil 303 tx.db.rwlock.Unlock() 304 305 // Merge statistics. 306 tx.db.statlock.Lock() 307 tx.db.stats.FreePageN = freelistFreeN 308 tx.db.stats.PendingPageN = freelistPendingN 309 tx.db.stats.FreeAlloc = (freelistFreeN + freelistPendingN) * tx.db.pageSize 310 tx.db.stats.FreelistInuse = freelistAlloc 311 tx.db.stats.TxStats.add(&tx.stats) 312 tx.db.statlock.Unlock() 313 } else { 314 tx.db.removeTx(tx) 315 } 316 317 // Clear all references. 318 tx.db = nil 319 tx.meta = nil 320 tx.root = Bucket{tx: tx} 321 tx.pages = nil 322} 323 324// Copy writes the entire database to a writer. 325// This function exists for backwards compatibility. 326// 327// Deprecated; Use WriteTo() instead. 328func (tx *Tx) Copy(w io.Writer) error { 329 _, err := tx.WriteTo(w) 330 return err 331} 332 333// WriteTo writes the entire database to a writer. 334// If err == nil then exactly tx.Size() bytes will be written into the writer. 335func (tx *Tx) WriteTo(w io.Writer) (n int64, err error) { 336 // Attempt to open reader with WriteFlag 337 f, err := tx.db.openFile(tx.db.path, os.O_RDONLY|tx.WriteFlag, 0) 338 if err != nil { 339 return 0, err 340 } 341 defer func() { 342 if cerr := f.Close(); err == nil { 343 err = cerr 344 } 345 }() 346 347 // Generate a meta page. We use the same page data for both meta pages. 348 buf := make([]byte, tx.db.pageSize) 349 page := (*page)(unsafe.Pointer(&buf[0])) 350 page.flags = metaPageFlag 351 *page.meta() = *tx.meta 352 353 // Write meta 0. 354 page.id = 0 355 page.meta().checksum = page.meta().sum64() 356 nn, err := w.Write(buf) 357 n += int64(nn) 358 if err != nil { 359 return n, fmt.Errorf("meta 0 copy: %s", err) 360 } 361 362 // Write meta 1 with a lower transaction id. 363 page.id = 1 364 page.meta().txid -= 1 365 page.meta().checksum = page.meta().sum64() 366 nn, err = w.Write(buf) 367 n += int64(nn) 368 if err != nil { 369 return n, fmt.Errorf("meta 1 copy: %s", err) 370 } 371 372 // Move past the meta pages in the file. 373 if _, err := f.Seek(int64(tx.db.pageSize*2), io.SeekStart); err != nil { 374 return n, fmt.Errorf("seek: %s", err) 375 } 376 377 // Copy data pages. 378 wn, err := io.CopyN(w, f, tx.Size()-int64(tx.db.pageSize*2)) 379 n += wn 380 if err != nil { 381 return n, err 382 } 383 384 return n, nil 385} 386 387// CopyFile copies the entire database to file at the given path. 388// A reader transaction is maintained during the copy so it is safe to continue 389// using the database while a copy is in progress. 390func (tx *Tx) CopyFile(path string, mode os.FileMode) error { 391 f, err := tx.db.openFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, mode) 392 if err != nil { 393 return err 394 } 395 396 err = tx.Copy(f) 397 if err != nil { 398 _ = f.Close() 399 return err 400 } 401 return f.Close() 402} 403 404// Check performs several consistency checks on the database for this transaction. 405// An error is returned if any inconsistency is found. 406// 407// It can be safely run concurrently on a writable transaction. However, this 408// incurs a high cost for large databases and databases with a lot of subbuckets 409// because of caching. This overhead can be removed if running on a read-only 410// transaction, however, it is not safe to execute other writer transactions at 411// the same time. 412func (tx *Tx) Check() <-chan error { 413 ch := make(chan error) 414 go tx.check(ch) 415 return ch 416} 417 418func (tx *Tx) check(ch chan error) { 419 // Force loading free list if opened in ReadOnly mode. 420 tx.db.loadFreelist() 421 422 // Check if any pages are double freed. 423 freed := make(map[pgid]bool) 424 all := make([]pgid, tx.db.freelist.count()) 425 tx.db.freelist.copyall(all) 426 for _, id := range all { 427 if freed[id] { 428 ch <- fmt.Errorf("page %d: already freed", id) 429 } 430 freed[id] = true 431 } 432 433 // Track every reachable page. 434 reachable := make(map[pgid]*page) 435 reachable[0] = tx.page(0) // meta0 436 reachable[1] = tx.page(1) // meta1 437 if tx.meta.freelist != pgidNoFreelist { 438 for i := uint32(0); i <= tx.page(tx.meta.freelist).overflow; i++ { 439 reachable[tx.meta.freelist+pgid(i)] = tx.page(tx.meta.freelist) 440 } 441 } 442 443 // Recursively check buckets. 444 tx.checkBucket(&tx.root, reachable, freed, ch) 445 446 // Ensure all pages below high water mark are either reachable or freed. 447 for i := pgid(0); i < tx.meta.pgid; i++ { 448 _, isReachable := reachable[i] 449 if !isReachable && !freed[i] { 450 ch <- fmt.Errorf("page %d: unreachable unfreed", int(i)) 451 } 452 } 453 454 // Close the channel to signal completion. 455 close(ch) 456} 457 458func (tx *Tx) checkBucket(b *Bucket, reachable map[pgid]*page, freed map[pgid]bool, ch chan error) { 459 // Ignore inline buckets. 460 if b.root == 0 { 461 return 462 } 463 464 // Check every page used by this bucket. 465 b.tx.forEachPage(b.root, 0, func(p *page, _ int) { 466 if p.id > tx.meta.pgid { 467 ch <- fmt.Errorf("page %d: out of bounds: %d", int(p.id), int(b.tx.meta.pgid)) 468 } 469 470 // Ensure each page is only referenced once. 471 for i := pgid(0); i <= pgid(p.overflow); i++ { 472 var id = p.id + i 473 if _, ok := reachable[id]; ok { 474 ch <- fmt.Errorf("page %d: multiple references", int(id)) 475 } 476 reachable[id] = p 477 } 478 479 // We should only encounter un-freed leaf and branch pages. 480 if freed[p.id] { 481 ch <- fmt.Errorf("page %d: reachable freed", int(p.id)) 482 } else if (p.flags&branchPageFlag) == 0 && (p.flags&leafPageFlag) == 0 { 483 ch <- fmt.Errorf("page %d: invalid type: %s", int(p.id), p.typ()) 484 } 485 }) 486 487 // Check each bucket within this bucket. 488 _ = b.ForEach(func(k, v []byte) error { 489 if child := b.Bucket(k); child != nil { 490 tx.checkBucket(child, reachable, freed, ch) 491 } 492 return nil 493 }) 494} 495 496// allocate returns a contiguous block of memory starting at a given page. 497func (tx *Tx) allocate(count int) (*page, error) { 498 p, err := tx.db.allocate(tx.meta.txid, count) 499 if err != nil { 500 return nil, err 501 } 502 503 // Save to our page cache. 504 tx.pages[p.id] = p 505 506 // Update statistics. 507 tx.stats.PageCount += count 508 tx.stats.PageAlloc += count * tx.db.pageSize 509 510 return p, nil 511} 512 513// write writes any dirty pages to disk. 514func (tx *Tx) write() error { 515 // Sort pages by id. 516 pages := make(pages, 0, len(tx.pages)) 517 for _, p := range tx.pages { 518 pages = append(pages, p) 519 } 520 // Clear out page cache early. 521 tx.pages = make(map[pgid]*page) 522 sort.Sort(pages) 523 524 // Write pages to disk in order. 525 for _, p := range pages { 526 size := (int(p.overflow) + 1) * tx.db.pageSize 527 offset := int64(p.id) * int64(tx.db.pageSize) 528 529 // Write out page in "max allocation" sized chunks. 530 ptr := (*[maxAllocSize]byte)(unsafe.Pointer(p)) 531 for { 532 // Limit our write to our max allocation size. 533 sz := size 534 if sz > maxAllocSize-1 { 535 sz = maxAllocSize - 1 536 } 537 538 // Write chunk to disk. 539 buf := ptr[:sz] 540 if _, err := tx.db.ops.writeAt(buf, offset); err != nil { 541 return err 542 } 543 544 // Update statistics. 545 tx.stats.Write++ 546 547 // Exit inner for loop if we've written all the chunks. 548 size -= sz 549 if size == 0 { 550 break 551 } 552 553 // Otherwise move offset forward and move pointer to next chunk. 554 offset += int64(sz) 555 ptr = (*[maxAllocSize]byte)(unsafe.Pointer(&ptr[sz])) 556 } 557 } 558 559 // Ignore file sync if flag is set on DB. 560 if !tx.db.NoSync || IgnoreNoSync { 561 if err := fdatasync(tx.db); err != nil { 562 return err 563 } 564 } 565 566 // Put small pages back to page pool. 567 for _, p := range pages { 568 // Ignore page sizes over 1 page. 569 // These are allocated using make() instead of the page pool. 570 if int(p.overflow) != 0 { 571 continue 572 } 573 574 buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:tx.db.pageSize] 575 576 // See https://go.googlesource.com/go/+/f03c9202c43e0abb130669852082117ca50aa9b1 577 for i := range buf { 578 buf[i] = 0 579 } 580 tx.db.pagePool.Put(buf) 581 } 582 583 return nil 584} 585 586// writeMeta writes the meta to the disk. 587func (tx *Tx) writeMeta() error { 588 // Create a temporary buffer for the meta page. 589 buf := make([]byte, tx.db.pageSize) 590 p := tx.db.pageInBuffer(buf, 0) 591 tx.meta.write(p) 592 593 // Write the meta page to file. 594 if _, err := tx.db.ops.writeAt(buf, int64(p.id)*int64(tx.db.pageSize)); err != nil { 595 return err 596 } 597 if !tx.db.NoSync || IgnoreNoSync { 598 if err := fdatasync(tx.db); err != nil { 599 return err 600 } 601 } 602 603 // Update statistics. 604 tx.stats.Write++ 605 606 return nil 607} 608 609// page returns a reference to the page with a given id. 610// If page has been written to then a temporary buffered page is returned. 611func (tx *Tx) page(id pgid) *page { 612 // Check the dirty pages first. 613 if tx.pages != nil { 614 if p, ok := tx.pages[id]; ok { 615 return p 616 } 617 } 618 619 // Otherwise return directly from the mmap. 620 return tx.db.page(id) 621} 622 623// forEachPage iterates over every page within a given page and executes a function. 624func (tx *Tx) forEachPage(pgid pgid, depth int, fn func(*page, int)) { 625 p := tx.page(pgid) 626 627 // Execute function. 628 fn(p, depth) 629 630 // Recursively loop over children. 631 if (p.flags & branchPageFlag) != 0 { 632 for i := 0; i < int(p.count); i++ { 633 elem := p.branchPageElement(uint16(i)) 634 tx.forEachPage(elem.pgid, depth+1, fn) 635 } 636 } 637} 638 639// Page returns page information for a given page number. 640// This is only safe for concurrent use when used by a writable transaction. 641func (tx *Tx) Page(id int) (*PageInfo, error) { 642 if tx.db == nil { 643 return nil, ErrTxClosed 644 } else if pgid(id) >= tx.meta.pgid { 645 return nil, nil 646 } 647 648 // Build the page info. 649 p := tx.db.page(pgid(id)) 650 info := &PageInfo{ 651 ID: id, 652 Count: int(p.count), 653 OverflowCount: int(p.overflow), 654 } 655 656 // Determine the type (or if it's free). 657 if tx.db.freelist.freed(pgid(id)) { 658 info.Type = "free" 659 } else { 660 info.Type = p.typ() 661 } 662 663 return info, nil 664} 665 666// TxStats represents statistics about the actions performed by the transaction. 667type TxStats struct { 668 // Page statistics. 669 PageCount int // number of page allocations 670 PageAlloc int // total bytes allocated 671 672 // Cursor statistics. 673 CursorCount int // number of cursors created 674 675 // Node statistics 676 NodeCount int // number of node allocations 677 NodeDeref int // number of node dereferences 678 679 // Rebalance statistics. 680 Rebalance int // number of node rebalances 681 RebalanceTime time.Duration // total time spent rebalancing 682 683 // Split/Spill statistics. 684 Split int // number of nodes split 685 Spill int // number of nodes spilled 686 SpillTime time.Duration // total time spent spilling 687 688 // Write statistics. 689 Write int // number of writes performed 690 WriteTime time.Duration // total time spent writing to disk 691} 692 693func (s *TxStats) add(other *TxStats) { 694 s.PageCount += other.PageCount 695 s.PageAlloc += other.PageAlloc 696 s.CursorCount += other.CursorCount 697 s.NodeCount += other.NodeCount 698 s.NodeDeref += other.NodeDeref 699 s.Rebalance += other.Rebalance 700 s.RebalanceTime += other.RebalanceTime 701 s.Split += other.Split 702 s.Spill += other.Spill 703 s.SpillTime += other.SpillTime 704 s.Write += other.Write 705 s.WriteTime += other.WriteTime 706} 707 708// Sub calculates and returns the difference between two sets of transaction stats. 709// This is useful when obtaining stats at two different points and time and 710// you need the performance counters that occurred within that time span. 711func (s *TxStats) Sub(other *TxStats) TxStats { 712 var diff TxStats 713 diff.PageCount = s.PageCount - other.PageCount 714 diff.PageAlloc = s.PageAlloc - other.PageAlloc 715 diff.CursorCount = s.CursorCount - other.CursorCount 716 diff.NodeCount = s.NodeCount - other.NodeCount 717 diff.NodeDeref = s.NodeDeref - other.NodeDeref 718 diff.Rebalance = s.Rebalance - other.Rebalance 719 diff.RebalanceTime = s.RebalanceTime - other.RebalanceTime 720 diff.Split = s.Split - other.Split 721 diff.Spill = s.Spill - other.Spill 722 diff.SpillTime = s.SpillTime - other.SpillTime 723 diff.Write = s.Write - other.Write 724 diff.WriteTime = s.WriteTime - other.WriteTime 725 return diff 726} 727