1// Package autobatch provides a go-datastore implementation that 2// automatically batches together writes by holding puts in memory until 3// a certain threshold is met. 4package autobatch 5 6import ( 7 ds "github.com/ipfs/go-datastore" 8 dsq "github.com/ipfs/go-datastore/query" 9) 10 11// Datastore implements a go-datastore. 12type Datastore struct { 13 child ds.Batching 14 15 // TODO: discuss making ds.Batch implement the full ds.Datastore interface 16 buffer map[ds.Key]op 17 maxBufferEntries int 18} 19 20type op struct { 21 delete bool 22 value []byte 23} 24 25// NewAutoBatching returns a new datastore that automatically 26// batches writes using the given Batching datastore. The size 27// of the memory pool is given by size. 28func NewAutoBatching(d ds.Batching, size int) *Datastore { 29 return &Datastore{ 30 child: d, 31 buffer: make(map[ds.Key]op, size), 32 maxBufferEntries: size, 33 } 34} 35 36// Delete deletes a key/value 37func (d *Datastore) Delete(k ds.Key) error { 38 d.buffer[k] = op{delete: true} 39 if len(d.buffer) > d.maxBufferEntries { 40 return d.Flush() 41 } 42 return nil 43} 44 45// Get retrieves a value given a key. 46func (d *Datastore) Get(k ds.Key) ([]byte, error) { 47 o, ok := d.buffer[k] 48 if ok { 49 if o.delete { 50 return nil, ds.ErrNotFound 51 } 52 return o.value, nil 53 } 54 55 return d.child.Get(k) 56} 57 58// Put stores a key/value. 59func (d *Datastore) Put(k ds.Key, val []byte) error { 60 d.buffer[k] = op{value: val} 61 if len(d.buffer) > d.maxBufferEntries { 62 return d.Flush() 63 } 64 return nil 65} 66 67// Sync flushes all operations on keys at or under the prefix 68// from the current batch to the underlying datastore 69func (d *Datastore) Sync(prefix ds.Key) error { 70 b, err := d.child.Batch() 71 if err != nil { 72 return err 73 } 74 75 for k, o := range d.buffer { 76 if !(k.Equal(prefix) || k.IsDescendantOf(prefix)) { 77 continue 78 } 79 80 var err error 81 if o.delete { 82 err = b.Delete(k) 83 } else { 84 err = b.Put(k, o.value) 85 } 86 if err != nil { 87 return err 88 } 89 90 delete(d.buffer, k) 91 } 92 93 return b.Commit() 94} 95 96// Flush flushes the current batch to the underlying datastore. 97func (d *Datastore) Flush() error { 98 b, err := d.child.Batch() 99 if err != nil { 100 return err 101 } 102 103 for k, o := range d.buffer { 104 var err error 105 if o.delete { 106 err = b.Delete(k) 107 } else { 108 err = b.Put(k, o.value) 109 } 110 if err != nil { 111 return err 112 } 113 } 114 // clear out buffer 115 d.buffer = make(map[ds.Key]op, d.maxBufferEntries) 116 117 return b.Commit() 118} 119 120// Has checks if a key is stored. 121func (d *Datastore) Has(k ds.Key) (bool, error) { 122 o, ok := d.buffer[k] 123 if ok { 124 return !o.delete, nil 125 } 126 127 return d.child.Has(k) 128} 129 130// GetSize implements Datastore.GetSize 131func (d *Datastore) GetSize(k ds.Key) (int, error) { 132 o, ok := d.buffer[k] 133 if ok { 134 if o.delete { 135 return -1, ds.ErrNotFound 136 } 137 return len(o.value), nil 138 } 139 140 return d.child.GetSize(k) 141} 142 143// Query performs a query 144func (d *Datastore) Query(q dsq.Query) (dsq.Results, error) { 145 err := d.Flush() 146 if err != nil { 147 return nil, err 148 } 149 150 return d.child.Query(q) 151} 152 153// DiskUsage implements the PersistentDatastore interface. 154func (d *Datastore) DiskUsage() (uint64, error) { 155 return ds.DiskUsage(d.child) 156} 157 158func (d *Datastore) Close() error { 159 err1 := d.Flush() 160 err2 := d.child.Close() 161 if err1 != nil { 162 return err1 163 } 164 if err2 != nil { 165 return err2 166 } 167 return nil 168} 169