1package bbolt
2
3import (
4	"bytes"
5	"fmt"
6	"unsafe"
7)
8
9const (
10	// MaxKeySize is the maximum length of a key, in bytes.
11	MaxKeySize = 32768
12
13	// MaxValueSize is the maximum length of a value, in bytes.
14	MaxValueSize = (1 << 31) - 2
15)
16
17const bucketHeaderSize = int(unsafe.Sizeof(bucket{}))
18
19const (
20	minFillPercent = 0.1
21	maxFillPercent = 1.0
22)
23
24// DefaultFillPercent is the percentage that split pages are filled.
25// This value can be changed by setting Bucket.FillPercent.
26const DefaultFillPercent = 0.5
27
28// Bucket represents a collection of key/value pairs inside the database.
29type Bucket struct {
30	*bucket
31	tx       *Tx                // the associated transaction
32	buckets  map[string]*Bucket // subbucket cache
33	page     *page              // inline page reference
34	rootNode *node              // materialized node for the root page.
35	nodes    map[pgid]*node     // node cache
36
37	// Sets the threshold for filling nodes when they split. By default,
38	// the bucket will fill to 50% but it can be useful to increase this
39	// amount if you know that your write workloads are mostly append-only.
40	//
41	// This is non-persisted across transactions so it must be set in every Tx.
42	FillPercent float64
43}
44
45// bucket represents the on-file representation of a bucket.
46// This is stored as the "value" of a bucket key. If the bucket is small enough,
47// then its root page can be stored inline in the "value", after the bucket
48// header. In the case of inline buckets, the "root" will be 0.
49type bucket struct {
50	root     pgid   // page id of the bucket's root-level page
51	sequence uint64 // monotonically incrementing, used by NextSequence()
52}
53
54// newBucket returns a new bucket associated with a transaction.
55func newBucket(tx *Tx) Bucket {
56	var b = Bucket{tx: tx, FillPercent: DefaultFillPercent}
57	if tx.writable {
58		b.buckets = make(map[string]*Bucket)
59		b.nodes = make(map[pgid]*node)
60	}
61	return b
62}
63
64// Tx returns the tx of the bucket.
65func (b *Bucket) Tx() *Tx {
66	return b.tx
67}
68
69// Root returns the root of the bucket.
70func (b *Bucket) Root() pgid {
71	return b.root
72}
73
74// Writable returns whether the bucket is writable.
75func (b *Bucket) Writable() bool {
76	return b.tx.writable
77}
78
79// Cursor creates a cursor associated with the bucket.
80// The cursor is only valid as long as the transaction is open.
81// Do not use a cursor after the transaction is closed.
82func (b *Bucket) Cursor() *Cursor {
83	// Update transaction statistics.
84	b.tx.stats.CursorCount++
85
86	// Allocate and return a cursor.
87	return &Cursor{
88		bucket: b,
89		stack:  make([]elemRef, 0),
90	}
91}
92
93// Bucket retrieves a nested bucket by name.
94// Returns nil if the bucket does not exist.
95// The bucket instance is only valid for the lifetime of the transaction.
96func (b *Bucket) Bucket(name []byte) *Bucket {
97	if b.buckets != nil {
98		if child := b.buckets[string(name)]; child != nil {
99			return child
100		}
101	}
102
103	// Move cursor to key.
104	c := b.Cursor()
105	k, v, flags := c.seek(name)
106
107	// Return nil if the key doesn't exist or it is not a bucket.
108	if !bytes.Equal(name, k) || (flags&bucketLeafFlag) == 0 {
109		return nil
110	}
111
112	// Otherwise create a bucket and cache it.
113	var child = b.openBucket(v)
114	if b.buckets != nil {
115		b.buckets[string(name)] = child
116	}
117
118	return child
119}
120
121// Helper method that re-interprets a sub-bucket value
122// from a parent into a Bucket
123func (b *Bucket) openBucket(value []byte) *Bucket {
124	var child = newBucket(b.tx)
125
126	// If unaligned load/stores are broken on this arch and value is
127	// unaligned simply clone to an aligned byte array.
128	unaligned := brokenUnaligned && uintptr(unsafe.Pointer(&value[0]))&3 != 0
129
130	if unaligned {
131		value = cloneBytes(value)
132	}
133
134	// If this is a writable transaction then we need to copy the bucket entry.
135	// Read-only transactions can point directly at the mmap entry.
136	if b.tx.writable && !unaligned {
137		child.bucket = &bucket{}
138		*child.bucket = *(*bucket)(unsafe.Pointer(&value[0]))
139	} else {
140		child.bucket = (*bucket)(unsafe.Pointer(&value[0]))
141	}
142
143	// Save a reference to the inline page if the bucket is inline.
144	if child.root == 0 {
145		child.page = (*page)(unsafe.Pointer(&value[bucketHeaderSize]))
146	}
147
148	return &child
149}
150
151// CreateBucket creates a new bucket at the given key and returns the new bucket.
152// Returns an error if the key already exists, if the bucket name is blank, or if the bucket name is too long.
153// The bucket instance is only valid for the lifetime of the transaction.
154func (b *Bucket) CreateBucket(key []byte) (*Bucket, error) {
155	if b.tx.db == nil {
156		return nil, ErrTxClosed
157	} else if !b.tx.writable {
158		return nil, ErrTxNotWritable
159	} else if len(key) == 0 {
160		return nil, ErrBucketNameRequired
161	}
162
163	// Move cursor to correct position.
164	c := b.Cursor()
165	k, _, flags := c.seek(key)
166
167	// Return an error if there is an existing key.
168	if bytes.Equal(key, k) {
169		if (flags & bucketLeafFlag) != 0 {
170			return nil, ErrBucketExists
171		}
172		return nil, ErrIncompatibleValue
173	}
174
175	// Create empty, inline bucket.
176	var bucket = Bucket{
177		bucket:      &bucket{},
178		rootNode:    &node{isLeaf: true},
179		FillPercent: DefaultFillPercent,
180	}
181	var value = bucket.write()
182
183	// Insert into node.
184	key = cloneBytes(key)
185	c.node().put(key, key, value, 0, bucketLeafFlag)
186
187	// Since subbuckets are not allowed on inline buckets, we need to
188	// dereference the inline page, if it exists. This will cause the bucket
189	// to be treated as a regular, non-inline bucket for the rest of the tx.
190	b.page = nil
191
192	return b.Bucket(key), nil
193}
194
195// CreateBucketIfNotExists creates a new bucket if it doesn't already exist and returns a reference to it.
196// Returns an error if the bucket name is blank, or if the bucket name is too long.
197// The bucket instance is only valid for the lifetime of the transaction.
198func (b *Bucket) CreateBucketIfNotExists(key []byte) (*Bucket, error) {
199	child, err := b.CreateBucket(key)
200	if err == ErrBucketExists {
201		return b.Bucket(key), nil
202	} else if err != nil {
203		return nil, err
204	}
205	return child, nil
206}
207
208// DeleteBucket deletes a bucket at the given key.
209// Returns an error if the bucket does not exists, or if the key represents a non-bucket value.
210func (b *Bucket) DeleteBucket(key []byte) error {
211	if b.tx.db == nil {
212		return ErrTxClosed
213	} else if !b.Writable() {
214		return ErrTxNotWritable
215	}
216
217	// Move cursor to correct position.
218	c := b.Cursor()
219	k, _, flags := c.seek(key)
220
221	// Return an error if bucket doesn't exist or is not a bucket.
222	if !bytes.Equal(key, k) {
223		return ErrBucketNotFound
224	} else if (flags & bucketLeafFlag) == 0 {
225		return ErrIncompatibleValue
226	}
227
228	// Recursively delete all child buckets.
229	child := b.Bucket(key)
230	err := child.ForEach(func(k, v []byte) error {
231		if v == nil {
232			if err := child.DeleteBucket(k); err != nil {
233				return fmt.Errorf("delete bucket: %s", err)
234			}
235		}
236		return nil
237	})
238	if err != nil {
239		return err
240	}
241
242	// Remove cached copy.
243	delete(b.buckets, string(key))
244
245	// Release all bucket pages to freelist.
246	child.nodes = nil
247	child.rootNode = nil
248	child.free()
249
250	// Delete the node if we have a matching key.
251	c.node().del(key)
252
253	return nil
254}
255
256// Get retrieves the value for a key in the bucket.
257// Returns a nil value if the key does not exist or if the key is a nested bucket.
258// The returned value is only valid for the life of the transaction.
259func (b *Bucket) Get(key []byte) []byte {
260	k, v, flags := b.Cursor().seek(key)
261
262	// Return nil if this is a bucket.
263	if (flags & bucketLeafFlag) != 0 {
264		return nil
265	}
266
267	// If our target node isn't the same key as what's passed in then return nil.
268	if !bytes.Equal(key, k) {
269		return nil
270	}
271	return v
272}
273
274// Put sets the value for a key in the bucket.
275// If the key exist then its previous value will be overwritten.
276// Supplied value must remain valid for the life of the transaction.
277// Returns an error if the bucket was created from a read-only transaction, if the key is blank, if the key is too large, or if the value is too large.
278func (b *Bucket) Put(key []byte, value []byte) error {
279	if b.tx.db == nil {
280		return ErrTxClosed
281	} else if !b.Writable() {
282		return ErrTxNotWritable
283	} else if len(key) == 0 {
284		return ErrKeyRequired
285	} else if len(key) > MaxKeySize {
286		return ErrKeyTooLarge
287	} else if int64(len(value)) > MaxValueSize {
288		return ErrValueTooLarge
289	}
290
291	// Move cursor to correct position.
292	c := b.Cursor()
293	k, _, flags := c.seek(key)
294
295	// Return an error if there is an existing key with a bucket value.
296	if bytes.Equal(key, k) && (flags&bucketLeafFlag) != 0 {
297		return ErrIncompatibleValue
298	}
299
300	// Insert into node.
301	key = cloneBytes(key)
302	c.node().put(key, key, value, 0, 0)
303
304	return nil
305}
306
307// Delete removes a key from the bucket.
308// If the key does not exist then nothing is done and a nil error is returned.
309// Returns an error if the bucket was created from a read-only transaction.
310func (b *Bucket) Delete(key []byte) error {
311	if b.tx.db == nil {
312		return ErrTxClosed
313	} else if !b.Writable() {
314		return ErrTxNotWritable
315	}
316
317	// Move cursor to correct position.
318	c := b.Cursor()
319	k, _, flags := c.seek(key)
320
321	// Return nil if the key doesn't exist.
322	if !bytes.Equal(key, k) {
323		return nil
324	}
325
326	// Return an error if there is already existing bucket value.
327	if (flags & bucketLeafFlag) != 0 {
328		return ErrIncompatibleValue
329	}
330
331	// Delete the node if we have a matching key.
332	c.node().del(key)
333
334	return nil
335}
336
337// Sequence returns the current integer for the bucket without incrementing it.
338func (b *Bucket) Sequence() uint64 { return b.bucket.sequence }
339
340// SetSequence updates the sequence number for the bucket.
341func (b *Bucket) SetSequence(v uint64) error {
342	if b.tx.db == nil {
343		return ErrTxClosed
344	} else if !b.Writable() {
345		return ErrTxNotWritable
346	}
347
348	// Materialize the root node if it hasn't been already so that the
349	// bucket will be saved during commit.
350	if b.rootNode == nil {
351		_ = b.node(b.root, nil)
352	}
353
354	// Increment and return the sequence.
355	b.bucket.sequence = v
356	return nil
357}
358
359// NextSequence returns an autoincrementing integer for the bucket.
360func (b *Bucket) NextSequence() (uint64, error) {
361	if b.tx.db == nil {
362		return 0, ErrTxClosed
363	} else if !b.Writable() {
364		return 0, ErrTxNotWritable
365	}
366
367	// Materialize the root node if it hasn't been already so that the
368	// bucket will be saved during commit.
369	if b.rootNode == nil {
370		_ = b.node(b.root, nil)
371	}
372
373	// Increment and return the sequence.
374	b.bucket.sequence++
375	return b.bucket.sequence, nil
376}
377
378// ForEach executes a function for each key/value pair in a bucket.
379// If the provided function returns an error then the iteration is stopped and
380// the error is returned to the caller. The provided function must not modify
381// the bucket; this will result in undefined behavior.
382func (b *Bucket) ForEach(fn func(k, v []byte) error) error {
383	if b.tx.db == nil {
384		return ErrTxClosed
385	}
386	c := b.Cursor()
387	for k, v := c.First(); k != nil; k, v = c.Next() {
388		if err := fn(k, v); err != nil {
389			return err
390		}
391	}
392	return nil
393}
394
395// Stat returns stats on a bucket.
396func (b *Bucket) Stats() BucketStats {
397	var s, subStats BucketStats
398	pageSize := b.tx.db.pageSize
399	s.BucketN += 1
400	if b.root == 0 {
401		s.InlineBucketN += 1
402	}
403	b.forEachPage(func(p *page, depth int) {
404		if (p.flags & leafPageFlag) != 0 {
405			s.KeyN += int(p.count)
406
407			// used totals the used bytes for the page
408			used := pageHeaderSize
409
410			if p.count != 0 {
411				// If page has any elements, add all element headers.
412				used += leafPageElementSize * int(p.count-1)
413
414				// Add all element key, value sizes.
415				// The computation takes advantage of the fact that the position
416				// of the last element's key/value equals to the total of the sizes
417				// of all previous elements' keys and values.
418				// It also includes the last element's header.
419				lastElement := p.leafPageElement(p.count - 1)
420				used += int(lastElement.pos + lastElement.ksize + lastElement.vsize)
421			}
422
423			if b.root == 0 {
424				// For inlined bucket just update the inline stats
425				s.InlineBucketInuse += used
426			} else {
427				// For non-inlined bucket update all the leaf stats
428				s.LeafPageN++
429				s.LeafInuse += used
430				s.LeafOverflowN += int(p.overflow)
431
432				// Collect stats from sub-buckets.
433				// Do that by iterating over all element headers
434				// looking for the ones with the bucketLeafFlag.
435				for i := uint16(0); i < p.count; i++ {
436					e := p.leafPageElement(i)
437					if (e.flags & bucketLeafFlag) != 0 {
438						// For any bucket element, open the element value
439						// and recursively call Stats on the contained bucket.
440						subStats.Add(b.openBucket(e.value()).Stats())
441					}
442				}
443			}
444		} else if (p.flags & branchPageFlag) != 0 {
445			s.BranchPageN++
446			lastElement := p.branchPageElement(p.count - 1)
447
448			// used totals the used bytes for the page
449			// Add header and all element headers.
450			used := pageHeaderSize + (branchPageElementSize * int(p.count-1))
451
452			// Add size of all keys and values.
453			// Again, use the fact that last element's position equals to
454			// the total of key, value sizes of all previous elements.
455			used += int(lastElement.pos + lastElement.ksize)
456			s.BranchInuse += used
457			s.BranchOverflowN += int(p.overflow)
458		}
459
460		// Keep track of maximum page depth.
461		if depth+1 > s.Depth {
462			s.Depth = (depth + 1)
463		}
464	})
465
466	// Alloc stats can be computed from page counts and pageSize.
467	s.BranchAlloc = (s.BranchPageN + s.BranchOverflowN) * pageSize
468	s.LeafAlloc = (s.LeafPageN + s.LeafOverflowN) * pageSize
469
470	// Add the max depth of sub-buckets to get total nested depth.
471	s.Depth += subStats.Depth
472	// Add the stats for all sub-buckets
473	s.Add(subStats)
474	return s
475}
476
477// forEachPage iterates over every page in a bucket, including inline pages.
478func (b *Bucket) forEachPage(fn func(*page, int)) {
479	// If we have an inline page then just use that.
480	if b.page != nil {
481		fn(b.page, 0)
482		return
483	}
484
485	// Otherwise traverse the page hierarchy.
486	b.tx.forEachPage(b.root, 0, fn)
487}
488
489// forEachPageNode iterates over every page (or node) in a bucket.
490// This also includes inline pages.
491func (b *Bucket) forEachPageNode(fn func(*page, *node, int)) {
492	// If we have an inline page or root node then just use that.
493	if b.page != nil {
494		fn(b.page, nil, 0)
495		return
496	}
497	b._forEachPageNode(b.root, 0, fn)
498}
499
500func (b *Bucket) _forEachPageNode(pgid pgid, depth int, fn func(*page, *node, int)) {
501	var p, n = b.pageNode(pgid)
502
503	// Execute function.
504	fn(p, n, depth)
505
506	// Recursively loop over children.
507	if p != nil {
508		if (p.flags & branchPageFlag) != 0 {
509			for i := 0; i < int(p.count); i++ {
510				elem := p.branchPageElement(uint16(i))
511				b._forEachPageNode(elem.pgid, depth+1, fn)
512			}
513		}
514	} else {
515		if !n.isLeaf {
516			for _, inode := range n.inodes {
517				b._forEachPageNode(inode.pgid, depth+1, fn)
518			}
519		}
520	}
521}
522
523// spill writes all the nodes for this bucket to dirty pages.
524func (b *Bucket) spill() error {
525	// Spill all child buckets first.
526	for name, child := range b.buckets {
527		// If the child bucket is small enough and it has no child buckets then
528		// write it inline into the parent bucket's page. Otherwise spill it
529		// like a normal bucket and make the parent value a pointer to the page.
530		var value []byte
531		if child.inlineable() {
532			child.free()
533			value = child.write()
534		} else {
535			if err := child.spill(); err != nil {
536				return err
537			}
538
539			// Update the child bucket header in this bucket.
540			value = make([]byte, unsafe.Sizeof(bucket{}))
541			var bucket = (*bucket)(unsafe.Pointer(&value[0]))
542			*bucket = *child.bucket
543		}
544
545		// Skip writing the bucket if there are no materialized nodes.
546		if child.rootNode == nil {
547			continue
548		}
549
550		// Update parent node.
551		var c = b.Cursor()
552		k, _, flags := c.seek([]byte(name))
553		if !bytes.Equal([]byte(name), k) {
554			panic(fmt.Sprintf("misplaced bucket header: %x -> %x", []byte(name), k))
555		}
556		if flags&bucketLeafFlag == 0 {
557			panic(fmt.Sprintf("unexpected bucket header flag: %x", flags))
558		}
559		c.node().put([]byte(name), []byte(name), value, 0, bucketLeafFlag)
560	}
561
562	// Ignore if there's not a materialized root node.
563	if b.rootNode == nil {
564		return nil
565	}
566
567	// Spill nodes.
568	if err := b.rootNode.spill(); err != nil {
569		return err
570	}
571	b.rootNode = b.rootNode.root()
572
573	// Update the root node for this bucket.
574	if b.rootNode.pgid >= b.tx.meta.pgid {
575		panic(fmt.Sprintf("pgid (%d) above high water mark (%d)", b.rootNode.pgid, b.tx.meta.pgid))
576	}
577	b.root = b.rootNode.pgid
578
579	return nil
580}
581
582// inlineable returns true if a bucket is small enough to be written inline
583// and if it contains no subbuckets. Otherwise returns false.
584func (b *Bucket) inlineable() bool {
585	var n = b.rootNode
586
587	// Bucket must only contain a single leaf node.
588	if n == nil || !n.isLeaf {
589		return false
590	}
591
592	// Bucket is not inlineable if it contains subbuckets or if it goes beyond
593	// our threshold for inline bucket size.
594	var size = pageHeaderSize
595	for _, inode := range n.inodes {
596		size += leafPageElementSize + len(inode.key) + len(inode.value)
597
598		if inode.flags&bucketLeafFlag != 0 {
599			return false
600		} else if size > b.maxInlineBucketSize() {
601			return false
602		}
603	}
604
605	return true
606}
607
608// Returns the maximum total size of a bucket to make it a candidate for inlining.
609func (b *Bucket) maxInlineBucketSize() int {
610	return b.tx.db.pageSize / 4
611}
612
613// write allocates and writes a bucket to a byte slice.
614func (b *Bucket) write() []byte {
615	// Allocate the appropriate size.
616	var n = b.rootNode
617	var value = make([]byte, bucketHeaderSize+n.size())
618
619	// Write a bucket header.
620	var bucket = (*bucket)(unsafe.Pointer(&value[0]))
621	*bucket = *b.bucket
622
623	// Convert byte slice to a fake page and write the root node.
624	var p = (*page)(unsafe.Pointer(&value[bucketHeaderSize]))
625	n.write(p)
626
627	return value
628}
629
630// rebalance attempts to balance all nodes.
631func (b *Bucket) rebalance() {
632	for _, n := range b.nodes {
633		n.rebalance()
634	}
635	for _, child := range b.buckets {
636		child.rebalance()
637	}
638}
639
640// node creates a node from a page and associates it with a given parent.
641func (b *Bucket) node(pgid pgid, parent *node) *node {
642	_assert(b.nodes != nil, "nodes map expected")
643
644	// Retrieve node if it's already been created.
645	if n := b.nodes[pgid]; n != nil {
646		return n
647	}
648
649	// Otherwise create a node and cache it.
650	n := &node{bucket: b, parent: parent}
651	if parent == nil {
652		b.rootNode = n
653	} else {
654		parent.children = append(parent.children, n)
655	}
656
657	// Use the inline page if this is an inline bucket.
658	var p = b.page
659	if p == nil {
660		p = b.tx.page(pgid)
661	}
662
663	// Read the page into the node and cache it.
664	n.read(p)
665	b.nodes[pgid] = n
666
667	// Update statistics.
668	b.tx.stats.NodeCount++
669
670	return n
671}
672
673// free recursively frees all pages in the bucket.
674func (b *Bucket) free() {
675	if b.root == 0 {
676		return
677	}
678
679	var tx = b.tx
680	b.forEachPageNode(func(p *page, n *node, _ int) {
681		if p != nil {
682			tx.db.freelist.free(tx.meta.txid, p)
683		} else {
684			n.free()
685		}
686	})
687	b.root = 0
688}
689
690// dereference removes all references to the old mmap.
691func (b *Bucket) dereference() {
692	if b.rootNode != nil {
693		b.rootNode.root().dereference()
694	}
695
696	for _, child := range b.buckets {
697		child.dereference()
698	}
699}
700
701// pageNode returns the in-memory node, if it exists.
702// Otherwise returns the underlying page.
703func (b *Bucket) pageNode(id pgid) (*page, *node) {
704	// Inline buckets have a fake page embedded in their value so treat them
705	// differently. We'll return the rootNode (if available) or the fake page.
706	if b.root == 0 {
707		if id != 0 {
708			panic(fmt.Sprintf("inline bucket non-zero page access(2): %d != 0", id))
709		}
710		if b.rootNode != nil {
711			return nil, b.rootNode
712		}
713		return b.page, nil
714	}
715
716	// Check the node cache for non-inline buckets.
717	if b.nodes != nil {
718		if n := b.nodes[id]; n != nil {
719			return nil, n
720		}
721	}
722
723	// Finally lookup the page from the transaction if no node is materialized.
724	return b.tx.page(id), nil
725}
726
727// BucketStats records statistics about resources used by a bucket.
728type BucketStats struct {
729	// Page count statistics.
730	BranchPageN     int // number of logical branch pages
731	BranchOverflowN int // number of physical branch overflow pages
732	LeafPageN       int // number of logical leaf pages
733	LeafOverflowN   int // number of physical leaf overflow pages
734
735	// Tree statistics.
736	KeyN  int // number of keys/value pairs
737	Depth int // number of levels in B+tree
738
739	// Page size utilization.
740	BranchAlloc int // bytes allocated for physical branch pages
741	BranchInuse int // bytes actually used for branch data
742	LeafAlloc   int // bytes allocated for physical leaf pages
743	LeafInuse   int // bytes actually used for leaf data
744
745	// Bucket statistics
746	BucketN           int // total number of buckets including the top bucket
747	InlineBucketN     int // total number on inlined buckets
748	InlineBucketInuse int // bytes used for inlined buckets (also accounted for in LeafInuse)
749}
750
751func (s *BucketStats) Add(other BucketStats) {
752	s.BranchPageN += other.BranchPageN
753	s.BranchOverflowN += other.BranchOverflowN
754	s.LeafPageN += other.LeafPageN
755	s.LeafOverflowN += other.LeafOverflowN
756	s.KeyN += other.KeyN
757	if s.Depth < other.Depth {
758		s.Depth = other.Depth
759	}
760	s.BranchAlloc += other.BranchAlloc
761	s.BranchInuse += other.BranchInuse
762	s.LeafAlloc += other.LeafAlloc
763	s.LeafInuse += other.LeafInuse
764
765	s.BucketN += other.BucketN
766	s.InlineBucketN += other.InlineBucketN
767	s.InlineBucketInuse += other.InlineBucketInuse
768}
769
770// cloneBytes returns a copy of a given slice.
771func cloneBytes(v []byte) []byte {
772	var clone = make([]byte, len(v))
773	copy(clone, v)
774	return clone
775}
776