1package memdb
2
3import (
4	"bytes"
5	"fmt"
6	"strings"
7	"sync/atomic"
8	"unsafe"
9
10	iradix "github.com/hashicorp/go-immutable-radix"
11)
12
13const (
14	id = "id"
15)
16
17var (
18	// ErrNotFound is returned when the requested item is not found
19	ErrNotFound = fmt.Errorf("not found")
20)
21
22// tableIndex is a tuple of (Table, Index) used for lookups
23type tableIndex struct {
24	Table string
25	Index string
26}
27
28// Txn is a transaction against a MemDB.
29// This can be a read or write transaction.
30type Txn struct {
31	db      *MemDB
32	write   bool
33	rootTxn *iradix.Txn
34	after   []func()
35
36	// changes is used to track the changes performed during the transaction. If
37	// it is nil at transaction start then changes are not tracked.
38	changes Changes
39
40	modified map[tableIndex]*iradix.Txn
41}
42
43// TrackChanges enables change tracking for the transaction. If called at any
44// point before commit, subsequent mutations will be recorded and can be
45// retrieved using ChangeSet. Once this has been called on a transaction it
46// can't be unset. As with other Txn methods it's not safe to call this from a
47// different goroutine than the one making mutations or committing the
48// transaction.
49func (txn *Txn) TrackChanges() {
50	if txn.changes == nil {
51		txn.changes = make(Changes, 0, 1)
52	}
53}
54
55// readableIndex returns a transaction usable for reading the given index in a
56// table. If the transaction is a write transaction with modifications, a clone of the
57// modified index will be returned.
58func (txn *Txn) readableIndex(table, index string) *iradix.Txn {
59	// Look for existing transaction
60	if txn.write && txn.modified != nil {
61		key := tableIndex{table, index}
62		exist, ok := txn.modified[key]
63		if ok {
64			return exist.Clone()
65		}
66	}
67
68	// Create a read transaction
69	path := indexPath(table, index)
70	raw, _ := txn.rootTxn.Get(path)
71	indexTxn := raw.(*iradix.Tree).Txn()
72	return indexTxn
73}
74
75// writableIndex returns a transaction usable for modifying the
76// given index in a table.
77func (txn *Txn) writableIndex(table, index string) *iradix.Txn {
78	if txn.modified == nil {
79		txn.modified = make(map[tableIndex]*iradix.Txn)
80	}
81
82	// Look for existing transaction
83	key := tableIndex{table, index}
84	exist, ok := txn.modified[key]
85	if ok {
86		return exist
87	}
88
89	// Start a new transaction
90	path := indexPath(table, index)
91	raw, _ := txn.rootTxn.Get(path)
92	indexTxn := raw.(*iradix.Tree).Txn()
93
94	// If we are the primary DB, enable mutation tracking. Snapshots should
95	// not notify, otherwise we will trigger watches on the primary DB when
96	// the writes will not be visible.
97	indexTxn.TrackMutate(txn.db.primary)
98
99	// Keep this open for the duration of the txn
100	txn.modified[key] = indexTxn
101	return indexTxn
102}
103
104// Abort is used to cancel this transaction.
105// This is a noop for read transactions.
106func (txn *Txn) Abort() {
107	// Noop for a read transaction
108	if !txn.write {
109		return
110	}
111
112	// Check if already aborted or committed
113	if txn.rootTxn == nil {
114		return
115	}
116
117	// Clear the txn
118	txn.rootTxn = nil
119	txn.modified = nil
120	txn.changes = nil
121
122	// Release the writer lock since this is invalid
123	txn.db.writer.Unlock()
124}
125
126// Commit is used to finalize this transaction.
127// This is a noop for read transactions.
128func (txn *Txn) Commit() {
129	// Noop for a read transaction
130	if !txn.write {
131		return
132	}
133
134	// Check if already aborted or committed
135	if txn.rootTxn == nil {
136		return
137	}
138
139	// Commit each sub-transaction scoped to (table, index)
140	for key, subTxn := range txn.modified {
141		path := indexPath(key.Table, key.Index)
142		final := subTxn.CommitOnly()
143		txn.rootTxn.Insert(path, final)
144	}
145
146	// Update the root of the DB
147	newRoot := txn.rootTxn.CommitOnly()
148	atomic.StorePointer(&txn.db.root, unsafe.Pointer(newRoot))
149
150	// Now issue all of the mutation updates (this is safe to call
151	// even if mutation tracking isn't enabled); we do this after
152	// the root pointer is swapped so that waking responders will
153	// see the new state.
154	for _, subTxn := range txn.modified {
155		subTxn.Notify()
156	}
157	txn.rootTxn.Notify()
158
159	// Clear the txn
160	txn.rootTxn = nil
161	txn.modified = nil
162
163	// Release the writer lock since this is invalid
164	txn.db.writer.Unlock()
165
166	// Run the deferred functions, if any
167	for i := len(txn.after); i > 0; i-- {
168		fn := txn.after[i-1]
169		fn()
170	}
171}
172
173// Insert is used to add or update an object into the given table
174func (txn *Txn) Insert(table string, obj interface{}) error {
175	if !txn.write {
176		return fmt.Errorf("cannot insert in read-only transaction")
177	}
178
179	// Get the table schema
180	tableSchema, ok := txn.db.schema.Tables[table]
181	if !ok {
182		return fmt.Errorf("invalid table '%s'", table)
183	}
184
185	// Get the primary ID of the object
186	idSchema := tableSchema.Indexes[id]
187	idIndexer := idSchema.Indexer.(SingleIndexer)
188	ok, idVal, err := idIndexer.FromObject(obj)
189	if err != nil {
190		return fmt.Errorf("failed to build primary index: %v", err)
191	}
192	if !ok {
193		return fmt.Errorf("object missing primary index")
194	}
195
196	// Lookup the object by ID first, to see if this is an update
197	idTxn := txn.writableIndex(table, id)
198	existing, update := idTxn.Get(idVal)
199
200	// On an update, there is an existing object with the given
201	// primary ID. We do the update by deleting the current object
202	// and inserting the new object.
203	for name, indexSchema := range tableSchema.Indexes {
204		indexTxn := txn.writableIndex(table, name)
205
206		// Determine the new index value
207		var (
208			ok   bool
209			vals [][]byte
210			err  error
211		)
212		switch indexer := indexSchema.Indexer.(type) {
213		case SingleIndexer:
214			var val []byte
215			ok, val, err = indexer.FromObject(obj)
216			vals = [][]byte{val}
217		case MultiIndexer:
218			ok, vals, err = indexer.FromObject(obj)
219		}
220		if err != nil {
221			return fmt.Errorf("failed to build index '%s': %v", name, err)
222		}
223
224		// Handle non-unique index by computing a unique index.
225		// This is done by appending the primary key which must
226		// be unique anyways.
227		if ok && !indexSchema.Unique {
228			for i := range vals {
229				vals[i] = append(vals[i], idVal...)
230			}
231		}
232
233		// Handle the update by deleting from the index first
234		if update {
235			var (
236				okExist   bool
237				valsExist [][]byte
238				err       error
239			)
240			switch indexer := indexSchema.Indexer.(type) {
241			case SingleIndexer:
242				var valExist []byte
243				okExist, valExist, err = indexer.FromObject(existing)
244				valsExist = [][]byte{valExist}
245			case MultiIndexer:
246				okExist, valsExist, err = indexer.FromObject(existing)
247			}
248			if err != nil {
249				return fmt.Errorf("failed to build index '%s': %v", name, err)
250			}
251			if okExist {
252				for i, valExist := range valsExist {
253					// Handle non-unique index by computing a unique index.
254					// This is done by appending the primary key which must
255					// be unique anyways.
256					if !indexSchema.Unique {
257						valExist = append(valExist, idVal...)
258					}
259
260					// If we are writing to the same index with the same value,
261					// we can avoid the delete as the insert will overwrite the
262					// value anyways.
263					if i >= len(vals) || !bytes.Equal(valExist, vals[i]) {
264						indexTxn.Delete(valExist)
265					}
266				}
267			}
268		}
269
270		// If there is no index value, either this is an error or an expected
271		// case and we can skip updating
272		if !ok {
273			if indexSchema.AllowMissing {
274				continue
275			} else {
276				return fmt.Errorf("missing value for index '%s'", name)
277			}
278		}
279
280		// Update the value of the index
281		for _, val := range vals {
282			indexTxn.Insert(val, obj)
283		}
284	}
285	if txn.changes != nil {
286		txn.changes = append(txn.changes, Change{
287			Table:      table,
288			Before:     existing, // might be nil on a create
289			After:      obj,
290			primaryKey: idVal,
291		})
292	}
293	return nil
294}
295
296// Delete is used to delete a single object from the given table
297// This object must already exist in the table
298func (txn *Txn) Delete(table string, obj interface{}) error {
299	if !txn.write {
300		return fmt.Errorf("cannot delete in read-only transaction")
301	}
302
303	// Get the table schema
304	tableSchema, ok := txn.db.schema.Tables[table]
305	if !ok {
306		return fmt.Errorf("invalid table '%s'", table)
307	}
308
309	// Get the primary ID of the object
310	idSchema := tableSchema.Indexes[id]
311	idIndexer := idSchema.Indexer.(SingleIndexer)
312	ok, idVal, err := idIndexer.FromObject(obj)
313	if err != nil {
314		return fmt.Errorf("failed to build primary index: %v", err)
315	}
316	if !ok {
317		return fmt.Errorf("object missing primary index")
318	}
319
320	// Lookup the object by ID first, check fi we should continue
321	idTxn := txn.writableIndex(table, id)
322	existing, ok := idTxn.Get(idVal)
323	if !ok {
324		return ErrNotFound
325	}
326
327	// Remove the object from all the indexes
328	for name, indexSchema := range tableSchema.Indexes {
329		indexTxn := txn.writableIndex(table, name)
330
331		// Handle the update by deleting from the index first
332		var (
333			ok   bool
334			vals [][]byte
335			err  error
336		)
337		switch indexer := indexSchema.Indexer.(type) {
338		case SingleIndexer:
339			var val []byte
340			ok, val, err = indexer.FromObject(existing)
341			vals = [][]byte{val}
342		case MultiIndexer:
343			ok, vals, err = indexer.FromObject(existing)
344		}
345		if err != nil {
346			return fmt.Errorf("failed to build index '%s': %v", name, err)
347		}
348		if ok {
349			// Handle non-unique index by computing a unique index.
350			// This is done by appending the primary key which must
351			// be unique anyways.
352			for _, val := range vals {
353				if !indexSchema.Unique {
354					val = append(val, idVal...)
355				}
356				indexTxn.Delete(val)
357			}
358		}
359	}
360	if txn.changes != nil {
361		txn.changes = append(txn.changes, Change{
362			Table:      table,
363			Before:     existing,
364			After:      nil, // Now nil indicates deletion
365			primaryKey: idVal,
366		})
367	}
368	return nil
369}
370
371// DeletePrefix is used to delete an entire subtree based on a prefix.
372// The given index must be a prefix index, and will be used to perform a scan and enumerate the set of objects to delete.
373// These will be removed from all other indexes, and then a special prefix operation will delete the objects from the given index in an efficient subtree delete operation.
374// This is useful when you have a very large number of objects indexed by the given index, along with a much smaller number of entries in the other indexes for those objects.
375func (txn *Txn) DeletePrefix(table string, prefix_index string, prefix string) (bool, error) {
376	if !txn.write {
377		return false, fmt.Errorf("cannot delete in read-only transaction")
378	}
379
380	if !strings.HasSuffix(prefix_index, "_prefix") {
381		return false, fmt.Errorf("Index name for DeletePrefix must be a prefix index, Got %v ", prefix_index)
382	}
383
384	deletePrefixIndex := strings.TrimSuffix(prefix_index, "_prefix")
385
386	// Get an iterator over all of the keys with the given prefix.
387	entries, err := txn.Get(table, prefix_index, prefix)
388	if err != nil {
389		return false, fmt.Errorf("failed kvs lookup: %s", err)
390	}
391	// Get the table schema
392	tableSchema, ok := txn.db.schema.Tables[table]
393	if !ok {
394		return false, fmt.Errorf("invalid table '%s'", table)
395	}
396
397	foundAny := false
398	for entry := entries.Next(); entry != nil; entry = entries.Next() {
399		if !foundAny {
400			foundAny = true
401		}
402		// Get the primary ID of the object
403		idSchema := tableSchema.Indexes[id]
404		idIndexer := idSchema.Indexer.(SingleIndexer)
405		ok, idVal, err := idIndexer.FromObject(entry)
406		if err != nil {
407			return false, fmt.Errorf("failed to build primary index: %v", err)
408		}
409		if !ok {
410			return false, fmt.Errorf("object missing primary index")
411		}
412		if txn.changes != nil {
413			// Record the deletion
414			idTxn := txn.writableIndex(table, id)
415			existing, ok := idTxn.Get(idVal)
416			if ok {
417				txn.changes = append(txn.changes, Change{
418					Table:      table,
419					Before:     existing,
420					After:      nil, // Now nil indicates deletion
421					primaryKey: idVal,
422				})
423			}
424		}
425		// Remove the object from all the indexes except the given prefix index
426		for name, indexSchema := range tableSchema.Indexes {
427			if name == deletePrefixIndex {
428				continue
429			}
430			indexTxn := txn.writableIndex(table, name)
431
432			// Handle the update by deleting from the index first
433			var (
434				ok   bool
435				vals [][]byte
436				err  error
437			)
438			switch indexer := indexSchema.Indexer.(type) {
439			case SingleIndexer:
440				var val []byte
441				ok, val, err = indexer.FromObject(entry)
442				vals = [][]byte{val}
443			case MultiIndexer:
444				ok, vals, err = indexer.FromObject(entry)
445			}
446			if err != nil {
447				return false, fmt.Errorf("failed to build index '%s': %v", name, err)
448			}
449
450			if ok {
451				// Handle non-unique index by computing a unique index.
452				// This is done by appending the primary key which must
453				// be unique anyways.
454				for _, val := range vals {
455					if !indexSchema.Unique {
456						val = append(val, idVal...)
457					}
458					indexTxn.Delete(val)
459				}
460			}
461		}
462
463	}
464	if foundAny {
465		indexTxn := txn.writableIndex(table, deletePrefixIndex)
466		ok = indexTxn.DeletePrefix([]byte(prefix))
467		if !ok {
468			panic(fmt.Errorf("prefix %v matched some entries but DeletePrefix did not delete any ", prefix))
469		}
470		return true, nil
471	}
472	return false, nil
473}
474
475// DeleteAll is used to delete all the objects in a given table
476// matching the constraints on the index
477func (txn *Txn) DeleteAll(table, index string, args ...interface{}) (int, error) {
478	if !txn.write {
479		return 0, fmt.Errorf("cannot delete in read-only transaction")
480	}
481
482	// Get all the objects
483	iter, err := txn.Get(table, index, args...)
484	if err != nil {
485		return 0, err
486	}
487
488	// Put them into a slice so there are no safety concerns while actually
489	// performing the deletes
490	var objs []interface{}
491	for {
492		obj := iter.Next()
493		if obj == nil {
494			break
495		}
496
497		objs = append(objs, obj)
498	}
499
500	// Do the deletes
501	num := 0
502	for _, obj := range objs {
503		if err := txn.Delete(table, obj); err != nil {
504			return num, err
505		}
506		num++
507	}
508	return num, nil
509}
510
511// FirstWatch is used to return the first matching object for
512// the given constraints on the index along with the watch channel
513func (txn *Txn) FirstWatch(table, index string, args ...interface{}) (<-chan struct{}, interface{}, error) {
514	// Get the index value
515	indexSchema, val, err := txn.getIndexValue(table, index, args...)
516	if err != nil {
517		return nil, nil, err
518	}
519
520	// Get the index itself
521	indexTxn := txn.readableIndex(table, indexSchema.Name)
522
523	// Do an exact lookup
524	if indexSchema.Unique && val != nil && indexSchema.Name == index {
525		watch, obj, ok := indexTxn.GetWatch(val)
526		if !ok {
527			return watch, nil, nil
528		}
529		return watch, obj, nil
530	}
531
532	// Handle non-unique index by using an iterator and getting the first value
533	iter := indexTxn.Root().Iterator()
534	watch := iter.SeekPrefixWatch(val)
535	_, value, _ := iter.Next()
536	return watch, value, nil
537}
538
539// LastWatch is used to return the last matching object for
540// the given constraints on the index along with the watch channel
541func (txn *Txn) LastWatch(table, index string, args ...interface{}) (<-chan struct{}, interface{}, error) {
542	// Get the index value
543	indexSchema, val, err := txn.getIndexValue(table, index, args...)
544	if err != nil {
545		return nil, nil, err
546	}
547
548	// Get the index itself
549	indexTxn := txn.readableIndex(table, indexSchema.Name)
550
551	// Do an exact lookup
552	if indexSchema.Unique && val != nil && indexSchema.Name == index {
553		watch, obj, ok := indexTxn.GetWatch(val)
554		if !ok {
555			return watch, nil, nil
556		}
557		return watch, obj, nil
558	}
559
560	// Handle non-unique index by using an iterator and getting the last value
561	iter := indexTxn.Root().ReverseIterator()
562	watch := iter.SeekPrefixWatch(val)
563	_, value, _ := iter.Previous()
564	return watch, value, nil
565}
566
567// First is used to return the first matching object for
568// the given constraints on the index
569func (txn *Txn) First(table, index string, args ...interface{}) (interface{}, error) {
570	_, val, err := txn.FirstWatch(table, index, args...)
571	return val, err
572}
573
574// Last is used to return the last matching object for
575// the given constraints on the index
576func (txn *Txn) Last(table, index string, args ...interface{}) (interface{}, error) {
577	_, val, err := txn.LastWatch(table, index, args...)
578	return val, err
579}
580
581// LongestPrefix is used to fetch the longest prefix match for the given
582// constraints on the index. Note that this will not work with the memdb
583// StringFieldIndex because it adds null terminators which prevent the
584// algorithm from correctly finding a match (it will get to right before the
585// null and fail to find a leaf node). This should only be used where the prefix
586// given is capable of matching indexed entries directly, which typically only
587// applies to a custom indexer. See the unit test for an example.
588func (txn *Txn) LongestPrefix(table, index string, args ...interface{}) (interface{}, error) {
589	// Enforce that this only works on prefix indexes.
590	if !strings.HasSuffix(index, "_prefix") {
591		return nil, fmt.Errorf("must use '%s_prefix' on index", index)
592	}
593
594	// Get the index value.
595	indexSchema, val, err := txn.getIndexValue(table, index, args...)
596	if err != nil {
597		return nil, err
598	}
599
600	// This algorithm only makes sense against a unique index, otherwise the
601	// index keys will have the IDs appended to them.
602	if !indexSchema.Unique {
603		return nil, fmt.Errorf("index '%s' is not unique", index)
604	}
605
606	// Find the longest prefix match with the given index.
607	indexTxn := txn.readableIndex(table, indexSchema.Name)
608	if _, value, ok := indexTxn.Root().LongestPrefix(val); ok {
609		return value, nil
610	}
611	return nil, nil
612}
613
614// getIndexValue is used to get the IndexSchema and the value
615// used to scan the index given the parameters. This handles prefix based
616// scans when the index has the "_prefix" suffix. The index must support
617// prefix iteration.
618func (txn *Txn) getIndexValue(table, index string, args ...interface{}) (*IndexSchema, []byte, error) {
619	// Get the table schema
620	tableSchema, ok := txn.db.schema.Tables[table]
621	if !ok {
622		return nil, nil, fmt.Errorf("invalid table '%s'", table)
623	}
624
625	// Check for a prefix scan
626	prefixScan := false
627	if strings.HasSuffix(index, "_prefix") {
628		index = strings.TrimSuffix(index, "_prefix")
629		prefixScan = true
630	}
631
632	// Get the index schema
633	indexSchema, ok := tableSchema.Indexes[index]
634	if !ok {
635		return nil, nil, fmt.Errorf("invalid index '%s'", index)
636	}
637
638	// Hot-path for when there are no arguments
639	if len(args) == 0 {
640		return indexSchema, nil, nil
641	}
642
643	// Special case the prefix scanning
644	if prefixScan {
645		prefixIndexer, ok := indexSchema.Indexer.(PrefixIndexer)
646		if !ok {
647			return indexSchema, nil,
648				fmt.Errorf("index '%s' does not support prefix scanning", index)
649		}
650
651		val, err := prefixIndexer.PrefixFromArgs(args...)
652		if err != nil {
653			return indexSchema, nil, fmt.Errorf("index error: %v", err)
654		}
655		return indexSchema, val, err
656	}
657
658	// Get the exact match index
659	val, err := indexSchema.Indexer.FromArgs(args...)
660	if err != nil {
661		return indexSchema, nil, fmt.Errorf("index error: %v", err)
662	}
663	return indexSchema, val, err
664}
665
666// ResultIterator is used to iterate over a list of results from a query on a table.
667//
668// When a ResultIterator is created from a write transaction, the results from
669// Next will reflect a snapshot of the table at the time the ResultIterator is
670// created.
671// This means that calling Insert or Delete on a transaction while iterating is
672// allowed, but the changes made by Insert or Delete will not be observed in the
673// results returned from subsequent calls to Next. For example if an item is deleted
674// from the index used by the iterator it will still be returned by Next. If an
675// item is inserted into the index used by the iterator, it will not be returned
676// by Next. However, an iterator created after a call to Insert or Delete will
677// reflect the modifications.
678//
679// When a ResultIterator is created from a write transaction, and there are already
680// modifications to the index used by the iterator, the modification cache of the
681// index will be invalidated. This may result in some additional allocations if
682// the same node in the index is modified again.
683type ResultIterator interface {
684	WatchCh() <-chan struct{}
685	// Next returns the next result from the iterator. If there are no more results
686	// nil is returned.
687	Next() interface{}
688}
689
690// Get is used to construct a ResultIterator over all the rows that match the
691// given constraints of an index.
692//
693// See the documentation for ResultIterator to understand the behaviour of the
694// returned ResultIterator.
695func (txn *Txn) Get(table, index string, args ...interface{}) (ResultIterator, error) {
696	indexIter, val, err := txn.getIndexIterator(table, index, args...)
697	if err != nil {
698		return nil, err
699	}
700
701	// Seek the iterator to the appropriate sub-set
702	watchCh := indexIter.SeekPrefixWatch(val)
703
704	// Create an iterator
705	iter := &radixIterator{
706		iter:    indexIter,
707		watchCh: watchCh,
708	}
709	return iter, nil
710}
711
712// GetReverse is used to construct a Reverse ResultIterator over all the
713// rows that match the given constraints of an index.
714// The returned ResultIterator's Next() will return the next Previous value.
715//
716// See the documentation for ResultIterator to understand the behaviour of the
717// returned ResultIterator.
718func (txn *Txn) GetReverse(table, index string, args ...interface{}) (ResultIterator, error) {
719	indexIter, val, err := txn.getIndexIteratorReverse(table, index, args...)
720	if err != nil {
721		return nil, err
722	}
723
724	// Seek the iterator to the appropriate sub-set
725	watchCh := indexIter.SeekPrefixWatch(val)
726
727	// Create an iterator
728	iter := &radixReverseIterator{
729		iter:    indexIter,
730		watchCh: watchCh,
731	}
732	return iter, nil
733}
734
735// LowerBound is used to construct a ResultIterator over all the the range of
736// rows that have an index value greater than or equal to the provide args.
737// Calling this then iterating until the rows are larger than required allows
738// range scans within an index. It is not possible to watch the resulting
739// iterator since the radix tree doesn't efficiently allow watching on lower
740// bound changes. The WatchCh returned will be nill and so will block forever.
741//
742// See the documentation for ResultIterator to understand the behaviour of the
743// returned ResultIterator.
744func (txn *Txn) LowerBound(table, index string, args ...interface{}) (ResultIterator, error) {
745	indexIter, val, err := txn.getIndexIterator(table, index, args...)
746	if err != nil {
747		return nil, err
748	}
749
750	// Seek the iterator to the appropriate sub-set
751	indexIter.SeekLowerBound(val)
752
753	// Create an iterator
754	iter := &radixIterator{
755		iter: indexIter,
756	}
757	return iter, nil
758}
759
760// ReverseLowerBound is used to construct a Reverse ResultIterator over all the
761// the range of rows that have an index value less than or equal to the
762// provide args.  Calling this then iterating until the rows are lower than
763// required allows range scans within an index. It is not possible to watch the
764// resulting iterator since the radix tree doesn't efficiently allow watching
765// on lower bound changes. The WatchCh returned will be nill and so will block
766// forever.
767//
768// See the documentation for ResultIterator to understand the behaviour of the
769// returned ResultIterator.
770func (txn *Txn) ReverseLowerBound(table, index string, args ...interface{}) (ResultIterator, error) {
771	indexIter, val, err := txn.getIndexIteratorReverse(table, index, args...)
772	if err != nil {
773		return nil, err
774	}
775
776	// Seek the iterator to the appropriate sub-set
777	indexIter.SeekReverseLowerBound(val)
778
779	// Create an iterator
780	iter := &radixReverseIterator{
781		iter: indexIter,
782	}
783	return iter, nil
784}
785
786// objectID is a tuple of table name and the raw internal id byte slice
787// converted to a string. It's only converted to a string to make it comparable
788// so this struct can be used as a map index.
789type objectID struct {
790	Table    string
791	IndexVal string
792}
793
794// mutInfo stores metadata about mutations to allow collapsing multiple
795// mutations to the same object into one.
796type mutInfo struct {
797	firstBefore interface{}
798	lastIdx     int
799}
800
801// Changes returns the set of object changes that have been made in the
802// transaction so far. If change tracking is not enabled it wil always return
803// nil. It can be called before or after Commit. If it is before Commit it will
804// return all changes made so far which may not be the same as the final
805// Changes. After abort it will always return nil. As with other Txn methods
806// it's not safe to call this from a different goroutine than the one making
807// mutations or committing the transaction. Mutations will appear in the order
808// they were performed in the transaction but multiple operations to the same
809// object will be collapsed so only the effective overall change to that object
810// is present. If transaction operations are dependent (e.g. copy object X to Y
811// then delete X) this might mean the set of mutations is incomplete to verify
812// history, but it is complete in that the net effect is preserved (Y got a new
813// value, X got removed).
814func (txn *Txn) Changes() Changes {
815	if txn.changes == nil {
816		return nil
817	}
818
819	// De-duplicate mutations by key so all take effect at the point of the last
820	// write but we keep the mutations in order.
821	dups := make(map[objectID]mutInfo)
822	for i, m := range txn.changes {
823		oid := objectID{
824			Table:    m.Table,
825			IndexVal: string(m.primaryKey),
826		}
827		// Store the latest mutation index for each key value
828		mi, ok := dups[oid]
829		if !ok {
830			// First entry for key, store the before value
831			mi.firstBefore = m.Before
832		}
833		mi.lastIdx = i
834		dups[oid] = mi
835	}
836	if len(dups) == len(txn.changes) {
837		// No duplicates found, fast path return it as is
838		return txn.changes
839	}
840
841	// Need to remove the duplicates
842	cs := make(Changes, 0, len(dups))
843	for i, m := range txn.changes {
844		oid := objectID{
845			Table:    m.Table,
846			IndexVal: string(m.primaryKey),
847		}
848		mi := dups[oid]
849		if mi.lastIdx == i {
850			// This was the latest value for this key copy it with the before value in
851			// case it's different. Note that m is not a pointer so we are not
852			// modifying the txn.changeSet here - it's already a copy.
853			m.Before = mi.firstBefore
854
855			// Edge case - if the object was inserted and then eventually deleted in
856			// the same transaction, then the net affect on that key is a no-op. Don't
857			// emit a mutation with nil for before and after as it's meaningless and
858			// might violate expectations and cause a panic in code that assumes at
859			// least one must be set.
860			if m.Before == nil && m.After == nil {
861				continue
862			}
863			cs = append(cs, m)
864		}
865	}
866	// Store the de-duped version in case this is called again
867	txn.changes = cs
868	return cs
869}
870
871func (txn *Txn) getIndexIterator(table, index string, args ...interface{}) (*iradix.Iterator, []byte, error) {
872	// Get the index value to scan
873	indexSchema, val, err := txn.getIndexValue(table, index, args...)
874	if err != nil {
875		return nil, nil, err
876	}
877
878	// Get the index itself
879	indexTxn := txn.readableIndex(table, indexSchema.Name)
880	indexRoot := indexTxn.Root()
881
882	// Get an iterator over the index
883	indexIter := indexRoot.Iterator()
884	return indexIter, val, nil
885}
886
887func (txn *Txn) getIndexIteratorReverse(table, index string, args ...interface{}) (*iradix.ReverseIterator, []byte, error) {
888	// Get the index value to scan
889	indexSchema, val, err := txn.getIndexValue(table, index, args...)
890	if err != nil {
891		return nil, nil, err
892	}
893
894	// Get the index itself
895	indexTxn := txn.readableIndex(table, indexSchema.Name)
896	indexRoot := indexTxn.Root()
897
898	// Get an interator over the index
899	indexIter := indexRoot.ReverseIterator()
900	return indexIter, val, nil
901}
902
903// Defer is used to push a new arbitrary function onto a stack which
904// gets called when a transaction is committed and finished. Deferred
905// functions are called in LIFO order, and only invoked at the end of
906// write transactions.
907func (txn *Txn) Defer(fn func()) {
908	txn.after = append(txn.after, fn)
909}
910
911// radixIterator is used to wrap an underlying iradix iterator.
912// This is much more efficient than a sliceIterator as we are not
913// materializing the entire view.
914type radixIterator struct {
915	iter    *iradix.Iterator
916	watchCh <-chan struct{}
917}
918
919func (r *radixIterator) WatchCh() <-chan struct{} {
920	return r.watchCh
921}
922
923func (r *radixIterator) Next() interface{} {
924	_, value, ok := r.iter.Next()
925	if !ok {
926		return nil
927	}
928	return value
929}
930
931type radixReverseIterator struct {
932	iter    *iradix.ReverseIterator
933	watchCh <-chan struct{}
934}
935
936func (r *radixReverseIterator) Next() interface{} {
937	_, value, ok := r.iter.Previous()
938	if !ok {
939		return nil
940	}
941	return value
942}
943
944func (r *radixReverseIterator) WatchCh() <-chan struct{} {
945	return r.watchCh
946}
947
948// Snapshot creates a snapshot of the current state of the transaction.
949// Returns a new read-only transaction or nil if the transaction is already
950// aborted or committed.
951func (txn *Txn) Snapshot() *Txn {
952	if txn.rootTxn == nil {
953		return nil
954	}
955
956	snapshot := &Txn{
957		db:      txn.db,
958		rootTxn: txn.rootTxn.Clone(),
959	}
960
961	// Commit sub-transactions into the snapshot
962	for key, subTxn := range txn.modified {
963		path := indexPath(key.Table, key.Index)
964		final := subTxn.CommitOnly()
965		snapshot.rootTxn.Insert(path, final)
966	}
967
968	return snapshot
969}
970