1package bbolt
2
3import (
4	"fmt"
5	"io"
6	"os"
7	"sort"
8	"strings"
9	"time"
10	"unsafe"
11)
12
13// txid represents the internal transaction identifier.
14type txid uint64
15
16// Tx represents a read-only or read/write transaction on the database.
17// Read-only transactions can be used for retrieving values for keys and creating cursors.
18// Read/write transactions can create and remove buckets and create and remove keys.
19//
20// IMPORTANT: You must commit or rollback transactions when you are done with
21// them. Pages can not be reclaimed by the writer until no more transactions
22// are using them. A long running read transaction can cause the database to
23// quickly grow.
24type Tx struct {
25	writable       bool
26	managed        bool
27	db             *DB
28	meta           *meta
29	root           Bucket
30	pages          map[pgid]*page
31	stats          TxStats
32	commitHandlers []func()
33
34	// WriteFlag specifies the flag for write-related methods like WriteTo().
35	// Tx opens the database file with the specified flag to copy the data.
36	//
37	// By default, the flag is unset, which works well for mostly in-memory
38	// workloads. For databases that are much larger than available RAM,
39	// set the flag to syscall.O_DIRECT to avoid trashing the page cache.
40	WriteFlag int
41}
42
43// init initializes the transaction.
44func (tx *Tx) init(db *DB) {
45	tx.db = db
46	tx.pages = nil
47
48	// Copy the meta page since it can be changed by the writer.
49	tx.meta = &meta{}
50	db.meta().copy(tx.meta)
51
52	// Copy over the root bucket.
53	tx.root = newBucket(tx)
54	tx.root.bucket = &bucket{}
55	*tx.root.bucket = tx.meta.root
56
57	// Increment the transaction id and add a page cache for writable transactions.
58	if tx.writable {
59		tx.pages = make(map[pgid]*page)
60		tx.meta.txid += txid(1)
61	}
62}
63
64// ID returns the transaction id.
65func (tx *Tx) ID() int {
66	return int(tx.meta.txid)
67}
68
69// DB returns a reference to the database that created the transaction.
70func (tx *Tx) DB() *DB {
71	return tx.db
72}
73
74// Size returns current database size in bytes as seen by this transaction.
75func (tx *Tx) Size() int64 {
76	return int64(tx.meta.pgid) * int64(tx.db.pageSize)
77}
78
79// Writable returns whether the transaction can perform write operations.
80func (tx *Tx) Writable() bool {
81	return tx.writable
82}
83
84// Cursor creates a cursor associated with the root bucket.
85// All items in the cursor will return a nil value because all root bucket keys point to buckets.
86// The cursor is only valid as long as the transaction is open.
87// Do not use a cursor after the transaction is closed.
88func (tx *Tx) Cursor() *Cursor {
89	return tx.root.Cursor()
90}
91
92// Stats retrieves a copy of the current transaction statistics.
93func (tx *Tx) Stats() TxStats {
94	return tx.stats
95}
96
97// Bucket retrieves a bucket by name.
98// Returns nil if the bucket does not exist.
99// The bucket instance is only valid for the lifetime of the transaction.
100func (tx *Tx) Bucket(name []byte) *Bucket {
101	return tx.root.Bucket(name)
102}
103
104// CreateBucket creates a new bucket.
105// Returns an error if the bucket already exists, if the bucket name is blank, or if the bucket name is too long.
106// The bucket instance is only valid for the lifetime of the transaction.
107func (tx *Tx) CreateBucket(name []byte) (*Bucket, error) {
108	return tx.root.CreateBucket(name)
109}
110
111// CreateBucketIfNotExists creates a new bucket if it doesn't already exist.
112// Returns an error if the bucket name is blank, or if the bucket name is too long.
113// The bucket instance is only valid for the lifetime of the transaction.
114func (tx *Tx) CreateBucketIfNotExists(name []byte) (*Bucket, error) {
115	return tx.root.CreateBucketIfNotExists(name)
116}
117
118// DeleteBucket deletes a bucket.
119// Returns an error if the bucket cannot be found or if the key represents a non-bucket value.
120func (tx *Tx) DeleteBucket(name []byte) error {
121	return tx.root.DeleteBucket(name)
122}
123
124// ForEach executes a function for each bucket in the root.
125// If the provided function returns an error then the iteration is stopped and
126// the error is returned to the caller.
127func (tx *Tx) ForEach(fn func(name []byte, b *Bucket) error) error {
128	return tx.root.ForEach(func(k, v []byte) error {
129		return fn(k, tx.root.Bucket(k))
130	})
131}
132
133// OnCommit adds a handler function to be executed after the transaction successfully commits.
134func (tx *Tx) OnCommit(fn func()) {
135	tx.commitHandlers = append(tx.commitHandlers, fn)
136}
137
138// Commit writes all changes to disk and updates the meta page.
139// Returns an error if a disk write error occurs, or if Commit is
140// called on a read-only transaction.
141func (tx *Tx) Commit() error {
142	_assert(!tx.managed, "managed tx commit not allowed")
143	if tx.db == nil {
144		return ErrTxClosed
145	} else if !tx.writable {
146		return ErrTxNotWritable
147	}
148
149	// TODO(benbjohnson): Use vectorized I/O to write out dirty pages.
150
151	// Rebalance nodes which have had deletions.
152	var startTime = time.Now()
153	tx.root.rebalance()
154	if tx.stats.Rebalance > 0 {
155		tx.stats.RebalanceTime += time.Since(startTime)
156	}
157
158	// spill data onto dirty pages.
159	startTime = time.Now()
160	if err := tx.root.spill(); err != nil {
161		tx.rollback()
162		return err
163	}
164	tx.stats.SpillTime += time.Since(startTime)
165
166	// Free the old root bucket.
167	tx.meta.root.root = tx.root.root
168
169	// Free the old freelist because commit writes out a fresh freelist.
170	if tx.meta.freelist != pgidNoFreelist {
171		tx.db.freelist.free(tx.meta.txid, tx.db.page(tx.meta.freelist))
172	}
173
174	if !tx.db.NoFreelistSync {
175		err := tx.commitFreelist()
176		if err != nil {
177			return err
178		}
179	} else {
180		tx.meta.freelist = pgidNoFreelist
181	}
182
183	// Write dirty pages to disk.
184	startTime = time.Now()
185	if err := tx.write(); err != nil {
186		tx.rollback()
187		return err
188	}
189
190	// If strict mode is enabled then perform a consistency check.
191	// Only the first consistency error is reported in the panic.
192	if tx.db.StrictMode {
193		ch := tx.Check()
194		var errs []string
195		for {
196			err, ok := <-ch
197			if !ok {
198				break
199			}
200			errs = append(errs, err.Error())
201		}
202		if len(errs) > 0 {
203			panic("check fail: " + strings.Join(errs, "\n"))
204		}
205	}
206
207	// Write meta to disk.
208	if err := tx.writeMeta(); err != nil {
209		tx.rollback()
210		return err
211	}
212	tx.stats.WriteTime += time.Since(startTime)
213
214	// Finalize the transaction.
215	tx.close()
216
217	// Execute commit handlers now that the locks have been removed.
218	for _, fn := range tx.commitHandlers {
219		fn()
220	}
221
222	return nil
223}
224
225func (tx *Tx) commitFreelist() error {
226	// Allocate new pages for the new free list. This will overestimate
227	// the size of the freelist but not underestimate the size (which would be bad).
228	opgid := tx.meta.pgid
229	p, err := tx.allocate((tx.db.freelist.size() / tx.db.pageSize) + 1)
230	if err != nil {
231		tx.rollback()
232		return err
233	}
234	if err := tx.db.freelist.write(p); err != nil {
235		tx.rollback()
236		return err
237	}
238	tx.meta.freelist = p.id
239	// If the high water mark has moved up then attempt to grow the database.
240	if tx.meta.pgid > opgid {
241		if err := tx.db.grow(int(tx.meta.pgid+1) * tx.db.pageSize); err != nil {
242			tx.rollback()
243			return err
244		}
245	}
246
247	return nil
248}
249
250// Rollback closes the transaction and ignores all previous updates. Read-only
251// transactions must be rolled back and not committed.
252func (tx *Tx) Rollback() error {
253	_assert(!tx.managed, "managed tx rollback not allowed")
254	if tx.db == nil {
255		return ErrTxClosed
256	}
257	tx.rollback()
258	return nil
259}
260
261func (tx *Tx) rollback() {
262	if tx.db == nil {
263		return
264	}
265	if tx.writable {
266		tx.db.freelist.rollback(tx.meta.txid)
267		tx.db.freelist.reload(tx.db.page(tx.db.meta().freelist))
268	}
269	tx.close()
270}
271
272func (tx *Tx) close() {
273	if tx.db == nil {
274		return
275	}
276	if tx.writable {
277		// Grab freelist stats.
278		var freelistFreeN = tx.db.freelist.free_count()
279		var freelistPendingN = tx.db.freelist.pending_count()
280		var freelistAlloc = tx.db.freelist.size()
281
282		// Remove transaction ref & writer lock.
283		tx.db.rwtx = nil
284		tx.db.rwlock.Unlock()
285
286		// Merge statistics.
287		tx.db.statlock.Lock()
288		tx.db.stats.FreePageN = freelistFreeN
289		tx.db.stats.PendingPageN = freelistPendingN
290		tx.db.stats.FreeAlloc = (freelistFreeN + freelistPendingN) * tx.db.pageSize
291		tx.db.stats.FreelistInuse = freelistAlloc
292		tx.db.stats.TxStats.add(&tx.stats)
293		tx.db.statlock.Unlock()
294	} else {
295		tx.db.removeTx(tx)
296	}
297
298	// Clear all references.
299	tx.db = nil
300	tx.meta = nil
301	tx.root = Bucket{tx: tx}
302	tx.pages = nil
303}
304
305// Copy writes the entire database to a writer.
306// This function exists for backwards compatibility.
307//
308// Deprecated; Use WriteTo() instead.
309func (tx *Tx) Copy(w io.Writer) error {
310	_, err := tx.WriteTo(w)
311	return err
312}
313
314// WriteTo writes the entire database to a writer.
315// If err == nil then exactly tx.Size() bytes will be written into the writer.
316func (tx *Tx) WriteTo(w io.Writer) (n int64, err error) {
317	// Attempt to open reader with WriteFlag
318	f, err := os.OpenFile(tx.db.path, os.O_RDONLY|tx.WriteFlag, 0)
319	if err != nil {
320		return 0, err
321	}
322	defer func() {
323		if cerr := f.Close(); err == nil {
324			err = cerr
325		}
326	}()
327
328	// Generate a meta page. We use the same page data for both meta pages.
329	buf := make([]byte, tx.db.pageSize)
330	page := (*page)(unsafe.Pointer(&buf[0]))
331	page.flags = metaPageFlag
332	*page.meta() = *tx.meta
333
334	// Write meta 0.
335	page.id = 0
336	page.meta().checksum = page.meta().sum64()
337	nn, err := w.Write(buf)
338	n += int64(nn)
339	if err != nil {
340		return n, fmt.Errorf("meta 0 copy: %s", err)
341	}
342
343	// Write meta 1 with a lower transaction id.
344	page.id = 1
345	page.meta().txid -= 1
346	page.meta().checksum = page.meta().sum64()
347	nn, err = w.Write(buf)
348	n += int64(nn)
349	if err != nil {
350		return n, fmt.Errorf("meta 1 copy: %s", err)
351	}
352
353	// Move past the meta pages in the file.
354	if _, err := f.Seek(int64(tx.db.pageSize*2), io.SeekStart); err != nil {
355		return n, fmt.Errorf("seek: %s", err)
356	}
357
358	// Copy data pages.
359	wn, err := io.CopyN(w, f, tx.Size()-int64(tx.db.pageSize*2))
360	n += wn
361	if err != nil {
362		return n, err
363	}
364
365	return n, nil
366}
367
368// CopyFile copies the entire database to file at the given path.
369// A reader transaction is maintained during the copy so it is safe to continue
370// using the database while a copy is in progress.
371func (tx *Tx) CopyFile(path string, mode os.FileMode) error {
372	f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, mode)
373	if err != nil {
374		return err
375	}
376
377	err = tx.Copy(f)
378	if err != nil {
379		_ = f.Close()
380		return err
381	}
382	return f.Close()
383}
384
385// Check performs several consistency checks on the database for this transaction.
386// An error is returned if any inconsistency is found.
387//
388// It can be safely run concurrently on a writable transaction. However, this
389// incurs a high cost for large databases and databases with a lot of subbuckets
390// because of caching. This overhead can be removed if running on a read-only
391// transaction, however, it is not safe to execute other writer transactions at
392// the same time.
393func (tx *Tx) Check() <-chan error {
394	ch := make(chan error)
395	go tx.check(ch)
396	return ch
397}
398
399func (tx *Tx) check(ch chan error) {
400	// Force loading free list if opened in ReadOnly mode.
401	tx.db.loadFreelist()
402
403	// Check if any pages are double freed.
404	freed := make(map[pgid]bool)
405	all := make([]pgid, tx.db.freelist.count())
406	tx.db.freelist.copyall(all)
407	for _, id := range all {
408		if freed[id] {
409			ch <- fmt.Errorf("page %d: already freed", id)
410		}
411		freed[id] = true
412	}
413
414	// Track every reachable page.
415	reachable := make(map[pgid]*page)
416	reachable[0] = tx.page(0) // meta0
417	reachable[1] = tx.page(1) // meta1
418	if tx.meta.freelist != pgidNoFreelist {
419		for i := uint32(0); i <= tx.page(tx.meta.freelist).overflow; i++ {
420			reachable[tx.meta.freelist+pgid(i)] = tx.page(tx.meta.freelist)
421		}
422	}
423
424	// Recursively check buckets.
425	tx.checkBucket(&tx.root, reachable, freed, ch)
426
427	// Ensure all pages below high water mark are either reachable or freed.
428	for i := pgid(0); i < tx.meta.pgid; i++ {
429		_, isReachable := reachable[i]
430		if !isReachable && !freed[i] {
431			ch <- fmt.Errorf("page %d: unreachable unfreed", int(i))
432		}
433	}
434
435	// Close the channel to signal completion.
436	close(ch)
437}
438
439func (tx *Tx) checkBucket(b *Bucket, reachable map[pgid]*page, freed map[pgid]bool, ch chan error) {
440	// Ignore inline buckets.
441	if b.root == 0 {
442		return
443	}
444
445	// Check every page used by this bucket.
446	b.tx.forEachPage(b.root, 0, func(p *page, _ int) {
447		if p.id > tx.meta.pgid {
448			ch <- fmt.Errorf("page %d: out of bounds: %d", int(p.id), int(b.tx.meta.pgid))
449		}
450
451		// Ensure each page is only referenced once.
452		for i := pgid(0); i <= pgid(p.overflow); i++ {
453			var id = p.id + i
454			if _, ok := reachable[id]; ok {
455				ch <- fmt.Errorf("page %d: multiple references", int(id))
456			}
457			reachable[id] = p
458		}
459
460		// We should only encounter un-freed leaf and branch pages.
461		if freed[p.id] {
462			ch <- fmt.Errorf("page %d: reachable freed", int(p.id))
463		} else if (p.flags&branchPageFlag) == 0 && (p.flags&leafPageFlag) == 0 {
464			ch <- fmt.Errorf("page %d: invalid type: %s", int(p.id), p.typ())
465		}
466	})
467
468	// Check each bucket within this bucket.
469	_ = b.ForEach(func(k, v []byte) error {
470		if child := b.Bucket(k); child != nil {
471			tx.checkBucket(child, reachable, freed, ch)
472		}
473		return nil
474	})
475}
476
477// allocate returns a contiguous block of memory starting at a given page.
478func (tx *Tx) allocate(count int) (*page, error) {
479	p, err := tx.db.allocate(tx.meta.txid, count)
480	if err != nil {
481		return nil, err
482	}
483
484	// Save to our page cache.
485	tx.pages[p.id] = p
486
487	// Update statistics.
488	tx.stats.PageCount += count
489	tx.stats.PageAlloc += count * tx.db.pageSize
490
491	return p, nil
492}
493
494// write writes any dirty pages to disk.
495func (tx *Tx) write() error {
496	// Sort pages by id.
497	pages := make(pages, 0, len(tx.pages))
498	for _, p := range tx.pages {
499		pages = append(pages, p)
500	}
501	// Clear out page cache early.
502	tx.pages = make(map[pgid]*page)
503	sort.Sort(pages)
504
505	// Write pages to disk in order.
506	for _, p := range pages {
507		size := (int(p.overflow) + 1) * tx.db.pageSize
508		offset := int64(p.id) * int64(tx.db.pageSize)
509
510		// Write out page in "max allocation" sized chunks.
511		ptr := (*[maxAllocSize]byte)(unsafe.Pointer(p))
512		for {
513			// Limit our write to our max allocation size.
514			sz := size
515			if sz > maxAllocSize-1 {
516				sz = maxAllocSize - 1
517			}
518
519			// Write chunk to disk.
520			buf := ptr[:sz]
521			if _, err := tx.db.ops.writeAt(buf, offset); err != nil {
522				return err
523			}
524
525			// Update statistics.
526			tx.stats.Write++
527
528			// Exit inner for loop if we've written all the chunks.
529			size -= sz
530			if size == 0 {
531				break
532			}
533
534			// Otherwise move offset forward and move pointer to next chunk.
535			offset += int64(sz)
536			ptr = (*[maxAllocSize]byte)(unsafe.Pointer(&ptr[sz]))
537		}
538	}
539
540	// Ignore file sync if flag is set on DB.
541	if !tx.db.NoSync || IgnoreNoSync {
542		if err := fdatasync(tx.db); err != nil {
543			return err
544		}
545	}
546
547	// Put small pages back to page pool.
548	for _, p := range pages {
549		// Ignore page sizes over 1 page.
550		// These are allocated using make() instead of the page pool.
551		if int(p.overflow) != 0 {
552			continue
553		}
554
555		buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:tx.db.pageSize]
556
557		// See https://go.googlesource.com/go/+/f03c9202c43e0abb130669852082117ca50aa9b1
558		for i := range buf {
559			buf[i] = 0
560		}
561		tx.db.pagePool.Put(buf)
562	}
563
564	return nil
565}
566
567// writeMeta writes the meta to the disk.
568func (tx *Tx) writeMeta() error {
569	// Create a temporary buffer for the meta page.
570	buf := make([]byte, tx.db.pageSize)
571	p := tx.db.pageInBuffer(buf, 0)
572	tx.meta.write(p)
573
574	// Write the meta page to file.
575	if _, err := tx.db.ops.writeAt(buf, int64(p.id)*int64(tx.db.pageSize)); err != nil {
576		return err
577	}
578	if !tx.db.NoSync || IgnoreNoSync {
579		if err := fdatasync(tx.db); err != nil {
580			return err
581		}
582	}
583
584	// Update statistics.
585	tx.stats.Write++
586
587	return nil
588}
589
590// page returns a reference to the page with a given id.
591// If page has been written to then a temporary buffered page is returned.
592func (tx *Tx) page(id pgid) *page {
593	// Check the dirty pages first.
594	if tx.pages != nil {
595		if p, ok := tx.pages[id]; ok {
596			return p
597		}
598	}
599
600	// Otherwise return directly from the mmap.
601	return tx.db.page(id)
602}
603
604// forEachPage iterates over every page within a given page and executes a function.
605func (tx *Tx) forEachPage(pgid pgid, depth int, fn func(*page, int)) {
606	p := tx.page(pgid)
607
608	// Execute function.
609	fn(p, depth)
610
611	// Recursively loop over children.
612	if (p.flags & branchPageFlag) != 0 {
613		for i := 0; i < int(p.count); i++ {
614			elem := p.branchPageElement(uint16(i))
615			tx.forEachPage(elem.pgid, depth+1, fn)
616		}
617	}
618}
619
620// Page returns page information for a given page number.
621// This is only safe for concurrent use when used by a writable transaction.
622func (tx *Tx) Page(id int) (*PageInfo, error) {
623	if tx.db == nil {
624		return nil, ErrTxClosed
625	} else if pgid(id) >= tx.meta.pgid {
626		return nil, nil
627	}
628
629	// Build the page info.
630	p := tx.db.page(pgid(id))
631	info := &PageInfo{
632		ID:            id,
633		Count:         int(p.count),
634		OverflowCount: int(p.overflow),
635	}
636
637	// Determine the type (or if it's free).
638	if tx.db.freelist.freed(pgid(id)) {
639		info.Type = "free"
640	} else {
641		info.Type = p.typ()
642	}
643
644	return info, nil
645}
646
647// TxStats represents statistics about the actions performed by the transaction.
648type TxStats struct {
649	// Page statistics.
650	PageCount int // number of page allocations
651	PageAlloc int // total bytes allocated
652
653	// Cursor statistics.
654	CursorCount int // number of cursors created
655
656	// Node statistics
657	NodeCount int // number of node allocations
658	NodeDeref int // number of node dereferences
659
660	// Rebalance statistics.
661	Rebalance     int           // number of node rebalances
662	RebalanceTime time.Duration // total time spent rebalancing
663
664	// Split/Spill statistics.
665	Split     int           // number of nodes split
666	Spill     int           // number of nodes spilled
667	SpillTime time.Duration // total time spent spilling
668
669	// Write statistics.
670	Write     int           // number of writes performed
671	WriteTime time.Duration // total time spent writing to disk
672}
673
674func (s *TxStats) add(other *TxStats) {
675	s.PageCount += other.PageCount
676	s.PageAlloc += other.PageAlloc
677	s.CursorCount += other.CursorCount
678	s.NodeCount += other.NodeCount
679	s.NodeDeref += other.NodeDeref
680	s.Rebalance += other.Rebalance
681	s.RebalanceTime += other.RebalanceTime
682	s.Split += other.Split
683	s.Spill += other.Spill
684	s.SpillTime += other.SpillTime
685	s.Write += other.Write
686	s.WriteTime += other.WriteTime
687}
688
689// Sub calculates and returns the difference between two sets of transaction stats.
690// This is useful when obtaining stats at two different points and time and
691// you need the performance counters that occurred within that time span.
692func (s *TxStats) Sub(other *TxStats) TxStats {
693	var diff TxStats
694	diff.PageCount = s.PageCount - other.PageCount
695	diff.PageAlloc = s.PageAlloc - other.PageAlloc
696	diff.CursorCount = s.CursorCount - other.CursorCount
697	diff.NodeCount = s.NodeCount - other.NodeCount
698	diff.NodeDeref = s.NodeDeref - other.NodeDeref
699	diff.Rebalance = s.Rebalance - other.Rebalance
700	diff.RebalanceTime = s.RebalanceTime - other.RebalanceTime
701	diff.Split = s.Split - other.Split
702	diff.Spill = s.Spill - other.Spill
703	diff.SpillTime = s.SpillTime - other.SpillTime
704	diff.Write = s.Write - other.Write
705	diff.WriteTime = s.WriteTime - other.WriteTime
706	return diff
707}
708