1// Package autobatch provides a go-datastore implementation that
2// automatically batches together writes by holding puts in memory until
3// a certain threshold is met.
4package autobatch
5
6import (
7	ds "github.com/ipfs/go-datastore"
8	dsq "github.com/ipfs/go-datastore/query"
9)
10
11// Datastore implements a go-datastore.
12type Datastore struct {
13	child ds.Batching
14
15	// TODO: discuss making ds.Batch implement the full ds.Datastore interface
16	buffer           map[ds.Key]op
17	maxBufferEntries int
18}
19
20type op struct {
21	delete bool
22	value  []byte
23}
24
25// NewAutoBatching returns a new datastore that automatically
26// batches writes using the given Batching datastore. The size
27// of the memory pool is given by size.
28func NewAutoBatching(d ds.Batching, size int) *Datastore {
29	return &Datastore{
30		child:            d,
31		buffer:           make(map[ds.Key]op, size),
32		maxBufferEntries: size,
33	}
34}
35
36// Delete deletes a key/value
37func (d *Datastore) Delete(k ds.Key) error {
38	d.buffer[k] = op{delete: true}
39	if len(d.buffer) > d.maxBufferEntries {
40		return d.Flush()
41	}
42	return nil
43}
44
45// Get retrieves a value given a key.
46func (d *Datastore) Get(k ds.Key) ([]byte, error) {
47	o, ok := d.buffer[k]
48	if ok {
49		if o.delete {
50			return nil, ds.ErrNotFound
51		}
52		return o.value, nil
53	}
54
55	return d.child.Get(k)
56}
57
58// Put stores a key/value.
59func (d *Datastore) Put(k ds.Key, val []byte) error {
60	d.buffer[k] = op{value: val}
61	if len(d.buffer) > d.maxBufferEntries {
62		return d.Flush()
63	}
64	return nil
65}
66
67// Sync flushes all operations on keys at or under the prefix
68// from the current batch to the underlying datastore
69func (d *Datastore) Sync(prefix ds.Key) error {
70	b, err := d.child.Batch()
71	if err != nil {
72		return err
73	}
74
75	for k, o := range d.buffer {
76		if !(k.Equal(prefix) || k.IsDescendantOf(prefix)) {
77			continue
78		}
79
80		var err error
81		if o.delete {
82			err = b.Delete(k)
83		} else {
84			err = b.Put(k, o.value)
85		}
86		if err != nil {
87			return err
88		}
89
90		delete(d.buffer, k)
91	}
92
93	return b.Commit()
94}
95
96// Flush flushes the current batch to the underlying datastore.
97func (d *Datastore) Flush() error {
98	b, err := d.child.Batch()
99	if err != nil {
100		return err
101	}
102
103	for k, o := range d.buffer {
104		var err error
105		if o.delete {
106			err = b.Delete(k)
107		} else {
108			err = b.Put(k, o.value)
109		}
110		if err != nil {
111			return err
112		}
113	}
114	// clear out buffer
115	d.buffer = make(map[ds.Key]op, d.maxBufferEntries)
116
117	return b.Commit()
118}
119
120// Has checks if a key is stored.
121func (d *Datastore) Has(k ds.Key) (bool, error) {
122	o, ok := d.buffer[k]
123	if ok {
124		return !o.delete, nil
125	}
126
127	return d.child.Has(k)
128}
129
130// GetSize implements Datastore.GetSize
131func (d *Datastore) GetSize(k ds.Key) (int, error) {
132	o, ok := d.buffer[k]
133	if ok {
134		if o.delete {
135			return -1, ds.ErrNotFound
136		}
137		return len(o.value), nil
138	}
139
140	return d.child.GetSize(k)
141}
142
143// Query performs a query
144func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) {
145	err := d.Flush()
146	if err != nil {
147		return nil, err
148	}
149
150	return d.child.Query(q)
151}
152
153// DiskUsage implements the PersistentDatastore interface.
154func (d *Datastore) DiskUsage() (uint64, error) {
155	return ds.DiskUsage(d.child)
156}
157
158func (d *Datastore) Close() error {
159	err1 := d.Flush()
160	err2 := d.child.Close()
161	if err1 != nil {
162		return err1
163	}
164	if err2 != nil {
165		return err2
166	}
167	return nil
168}
169