1package memdb 2 3import ( 4 "bytes" 5 "fmt" 6 "strings" 7 "sync/atomic" 8 "unsafe" 9 10 iradix "github.com/hashicorp/go-immutable-radix" 11) 12 13const ( 14 id = "id" 15) 16 17var ( 18 // ErrNotFound is returned when the requested item is not found 19 ErrNotFound = fmt.Errorf("not found") 20) 21 22// tableIndex is a tuple of (Table, Index) used for lookups 23type tableIndex struct { 24 Table string 25 Index string 26} 27 28// Txn is a transaction against a MemDB. 29// This can be a read or write transaction. 30type Txn struct { 31 db *MemDB 32 write bool 33 rootTxn *iradix.Txn 34 after []func() 35 36 // changes is used to track the changes performed during the transaction. If 37 // it is nil at transaction start then changes are not tracked. 38 changes Changes 39 40 modified map[tableIndex]*iradix.Txn 41} 42 43// TrackChanges enables change tracking for the transaction. If called at any 44// point before commit, subsequent mutations will be recorded and can be 45// retrieved using ChangeSet. Once this has been called on a transaction it 46// can't be unset. As with other Txn methods it's not safe to call this from a 47// different goroutine than the one making mutations or committing the 48// transaction. 49func (txn *Txn) TrackChanges() { 50 if txn.changes == nil { 51 txn.changes = make(Changes, 0, 1) 52 } 53} 54 55// readableIndex returns a transaction usable for reading the given index in a 56// table. If the transaction is a write transaction with modifications, a clone of the 57// modified index will be returned. 58func (txn *Txn) readableIndex(table, index string) *iradix.Txn { 59 // Look for existing transaction 60 if txn.write && txn.modified != nil { 61 key := tableIndex{table, index} 62 exist, ok := txn.modified[key] 63 if ok { 64 return exist.Clone() 65 } 66 } 67 68 // Create a read transaction 69 path := indexPath(table, index) 70 raw, _ := txn.rootTxn.Get(path) 71 indexTxn := raw.(*iradix.Tree).Txn() 72 return indexTxn 73} 74 75// writableIndex returns a transaction usable for modifying the 76// given index in a table. 77func (txn *Txn) writableIndex(table, index string) *iradix.Txn { 78 if txn.modified == nil { 79 txn.modified = make(map[tableIndex]*iradix.Txn) 80 } 81 82 // Look for existing transaction 83 key := tableIndex{table, index} 84 exist, ok := txn.modified[key] 85 if ok { 86 return exist 87 } 88 89 // Start a new transaction 90 path := indexPath(table, index) 91 raw, _ := txn.rootTxn.Get(path) 92 indexTxn := raw.(*iradix.Tree).Txn() 93 94 // If we are the primary DB, enable mutation tracking. Snapshots should 95 // not notify, otherwise we will trigger watches on the primary DB when 96 // the writes will not be visible. 97 indexTxn.TrackMutate(txn.db.primary) 98 99 // Keep this open for the duration of the txn 100 txn.modified[key] = indexTxn 101 return indexTxn 102} 103 104// Abort is used to cancel this transaction. 105// This is a noop for read transactions. 106func (txn *Txn) Abort() { 107 // Noop for a read transaction 108 if !txn.write { 109 return 110 } 111 112 // Check if already aborted or committed 113 if txn.rootTxn == nil { 114 return 115 } 116 117 // Clear the txn 118 txn.rootTxn = nil 119 txn.modified = nil 120 txn.changes = nil 121 122 // Release the writer lock since this is invalid 123 txn.db.writer.Unlock() 124} 125 126// Commit is used to finalize this transaction. 127// This is a noop for read transactions. 128func (txn *Txn) Commit() { 129 // Noop for a read transaction 130 if !txn.write { 131 return 132 } 133 134 // Check if already aborted or committed 135 if txn.rootTxn == nil { 136 return 137 } 138 139 // Commit each sub-transaction scoped to (table, index) 140 for key, subTxn := range txn.modified { 141 path := indexPath(key.Table, key.Index) 142 final := subTxn.CommitOnly() 143 txn.rootTxn.Insert(path, final) 144 } 145 146 // Update the root of the DB 147 newRoot := txn.rootTxn.CommitOnly() 148 atomic.StorePointer(&txn.db.root, unsafe.Pointer(newRoot)) 149 150 // Now issue all of the mutation updates (this is safe to call 151 // even if mutation tracking isn't enabled); we do this after 152 // the root pointer is swapped so that waking responders will 153 // see the new state. 154 for _, subTxn := range txn.modified { 155 subTxn.Notify() 156 } 157 txn.rootTxn.Notify() 158 159 // Clear the txn 160 txn.rootTxn = nil 161 txn.modified = nil 162 163 // Release the writer lock since this is invalid 164 txn.db.writer.Unlock() 165 166 // Run the deferred functions, if any 167 for i := len(txn.after); i > 0; i-- { 168 fn := txn.after[i-1] 169 fn() 170 } 171} 172 173// Insert is used to add or update an object into the given table 174func (txn *Txn) Insert(table string, obj interface{}) error { 175 if !txn.write { 176 return fmt.Errorf("cannot insert in read-only transaction") 177 } 178 179 // Get the table schema 180 tableSchema, ok := txn.db.schema.Tables[table] 181 if !ok { 182 return fmt.Errorf("invalid table '%s'", table) 183 } 184 185 // Get the primary ID of the object 186 idSchema := tableSchema.Indexes[id] 187 idIndexer := idSchema.Indexer.(SingleIndexer) 188 ok, idVal, err := idIndexer.FromObject(obj) 189 if err != nil { 190 return fmt.Errorf("failed to build primary index: %v", err) 191 } 192 if !ok { 193 return fmt.Errorf("object missing primary index") 194 } 195 196 // Lookup the object by ID first, to see if this is an update 197 idTxn := txn.writableIndex(table, id) 198 existing, update := idTxn.Get(idVal) 199 200 // On an update, there is an existing object with the given 201 // primary ID. We do the update by deleting the current object 202 // and inserting the new object. 203 for name, indexSchema := range tableSchema.Indexes { 204 indexTxn := txn.writableIndex(table, name) 205 206 // Determine the new index value 207 var ( 208 ok bool 209 vals [][]byte 210 err error 211 ) 212 switch indexer := indexSchema.Indexer.(type) { 213 case SingleIndexer: 214 var val []byte 215 ok, val, err = indexer.FromObject(obj) 216 vals = [][]byte{val} 217 case MultiIndexer: 218 ok, vals, err = indexer.FromObject(obj) 219 } 220 if err != nil { 221 return fmt.Errorf("failed to build index '%s': %v", name, err) 222 } 223 224 // Handle non-unique index by computing a unique index. 225 // This is done by appending the primary key which must 226 // be unique anyways. 227 if ok && !indexSchema.Unique { 228 for i := range vals { 229 vals[i] = append(vals[i], idVal...) 230 } 231 } 232 233 // Handle the update by deleting from the index first 234 if update { 235 var ( 236 okExist bool 237 valsExist [][]byte 238 err error 239 ) 240 switch indexer := indexSchema.Indexer.(type) { 241 case SingleIndexer: 242 var valExist []byte 243 okExist, valExist, err = indexer.FromObject(existing) 244 valsExist = [][]byte{valExist} 245 case MultiIndexer: 246 okExist, valsExist, err = indexer.FromObject(existing) 247 } 248 if err != nil { 249 return fmt.Errorf("failed to build index '%s': %v", name, err) 250 } 251 if okExist { 252 for i, valExist := range valsExist { 253 // Handle non-unique index by computing a unique index. 254 // This is done by appending the primary key which must 255 // be unique anyways. 256 if !indexSchema.Unique { 257 valExist = append(valExist, idVal...) 258 } 259 260 // If we are writing to the same index with the same value, 261 // we can avoid the delete as the insert will overwrite the 262 // value anyways. 263 if i >= len(vals) || !bytes.Equal(valExist, vals[i]) { 264 indexTxn.Delete(valExist) 265 } 266 } 267 } 268 } 269 270 // If there is no index value, either this is an error or an expected 271 // case and we can skip updating 272 if !ok { 273 if indexSchema.AllowMissing { 274 continue 275 } else { 276 return fmt.Errorf("missing value for index '%s'", name) 277 } 278 } 279 280 // Update the value of the index 281 for _, val := range vals { 282 indexTxn.Insert(val, obj) 283 } 284 } 285 if txn.changes != nil { 286 txn.changes = append(txn.changes, Change{ 287 Table: table, 288 Before: existing, // might be nil on a create 289 After: obj, 290 primaryKey: idVal, 291 }) 292 } 293 return nil 294} 295 296// Delete is used to delete a single object from the given table 297// This object must already exist in the table 298func (txn *Txn) Delete(table string, obj interface{}) error { 299 if !txn.write { 300 return fmt.Errorf("cannot delete in read-only transaction") 301 } 302 303 // Get the table schema 304 tableSchema, ok := txn.db.schema.Tables[table] 305 if !ok { 306 return fmt.Errorf("invalid table '%s'", table) 307 } 308 309 // Get the primary ID of the object 310 idSchema := tableSchema.Indexes[id] 311 idIndexer := idSchema.Indexer.(SingleIndexer) 312 ok, idVal, err := idIndexer.FromObject(obj) 313 if err != nil { 314 return fmt.Errorf("failed to build primary index: %v", err) 315 } 316 if !ok { 317 return fmt.Errorf("object missing primary index") 318 } 319 320 // Lookup the object by ID first, check fi we should continue 321 idTxn := txn.writableIndex(table, id) 322 existing, ok := idTxn.Get(idVal) 323 if !ok { 324 return ErrNotFound 325 } 326 327 // Remove the object from all the indexes 328 for name, indexSchema := range tableSchema.Indexes { 329 indexTxn := txn.writableIndex(table, name) 330 331 // Handle the update by deleting from the index first 332 var ( 333 ok bool 334 vals [][]byte 335 err error 336 ) 337 switch indexer := indexSchema.Indexer.(type) { 338 case SingleIndexer: 339 var val []byte 340 ok, val, err = indexer.FromObject(existing) 341 vals = [][]byte{val} 342 case MultiIndexer: 343 ok, vals, err = indexer.FromObject(existing) 344 } 345 if err != nil { 346 return fmt.Errorf("failed to build index '%s': %v", name, err) 347 } 348 if ok { 349 // Handle non-unique index by computing a unique index. 350 // This is done by appending the primary key which must 351 // be unique anyways. 352 for _, val := range vals { 353 if !indexSchema.Unique { 354 val = append(val, idVal...) 355 } 356 indexTxn.Delete(val) 357 } 358 } 359 } 360 if txn.changes != nil { 361 txn.changes = append(txn.changes, Change{ 362 Table: table, 363 Before: existing, 364 After: nil, // Now nil indicates deletion 365 primaryKey: idVal, 366 }) 367 } 368 return nil 369} 370 371// DeletePrefix is used to delete an entire subtree based on a prefix. 372// The given index must be a prefix index, and will be used to perform a scan and enumerate the set of objects to delete. 373// These will be removed from all other indexes, and then a special prefix operation will delete the objects from the given index in an efficient subtree delete operation. 374// This is useful when you have a very large number of objects indexed by the given index, along with a much smaller number of entries in the other indexes for those objects. 375func (txn *Txn) DeletePrefix(table string, prefix_index string, prefix string) (bool, error) { 376 if !txn.write { 377 return false, fmt.Errorf("cannot delete in read-only transaction") 378 } 379 380 if !strings.HasSuffix(prefix_index, "_prefix") { 381 return false, fmt.Errorf("Index name for DeletePrefix must be a prefix index, Got %v ", prefix_index) 382 } 383 384 deletePrefixIndex := strings.TrimSuffix(prefix_index, "_prefix") 385 386 // Get an iterator over all of the keys with the given prefix. 387 entries, err := txn.Get(table, prefix_index, prefix) 388 if err != nil { 389 return false, fmt.Errorf("failed kvs lookup: %s", err) 390 } 391 // Get the table schema 392 tableSchema, ok := txn.db.schema.Tables[table] 393 if !ok { 394 return false, fmt.Errorf("invalid table '%s'", table) 395 } 396 397 foundAny := false 398 for entry := entries.Next(); entry != nil; entry = entries.Next() { 399 if !foundAny { 400 foundAny = true 401 } 402 // Get the primary ID of the object 403 idSchema := tableSchema.Indexes[id] 404 idIndexer := idSchema.Indexer.(SingleIndexer) 405 ok, idVal, err := idIndexer.FromObject(entry) 406 if err != nil { 407 return false, fmt.Errorf("failed to build primary index: %v", err) 408 } 409 if !ok { 410 return false, fmt.Errorf("object missing primary index") 411 } 412 if txn.changes != nil { 413 // Record the deletion 414 idTxn := txn.writableIndex(table, id) 415 existing, ok := idTxn.Get(idVal) 416 if ok { 417 txn.changes = append(txn.changes, Change{ 418 Table: table, 419 Before: existing, 420 After: nil, // Now nil indicates deletion 421 primaryKey: idVal, 422 }) 423 } 424 } 425 // Remove the object from all the indexes except the given prefix index 426 for name, indexSchema := range tableSchema.Indexes { 427 if name == deletePrefixIndex { 428 continue 429 } 430 indexTxn := txn.writableIndex(table, name) 431 432 // Handle the update by deleting from the index first 433 var ( 434 ok bool 435 vals [][]byte 436 err error 437 ) 438 switch indexer := indexSchema.Indexer.(type) { 439 case SingleIndexer: 440 var val []byte 441 ok, val, err = indexer.FromObject(entry) 442 vals = [][]byte{val} 443 case MultiIndexer: 444 ok, vals, err = indexer.FromObject(entry) 445 } 446 if err != nil { 447 return false, fmt.Errorf("failed to build index '%s': %v", name, err) 448 } 449 450 if ok { 451 // Handle non-unique index by computing a unique index. 452 // This is done by appending the primary key which must 453 // be unique anyways. 454 for _, val := range vals { 455 if !indexSchema.Unique { 456 val = append(val, idVal...) 457 } 458 indexTxn.Delete(val) 459 } 460 } 461 } 462 463 } 464 if foundAny { 465 indexTxn := txn.writableIndex(table, deletePrefixIndex) 466 ok = indexTxn.DeletePrefix([]byte(prefix)) 467 if !ok { 468 panic(fmt.Errorf("prefix %v matched some entries but DeletePrefix did not delete any ", prefix)) 469 } 470 return true, nil 471 } 472 return false, nil 473} 474 475// DeleteAll is used to delete all the objects in a given table 476// matching the constraints on the index 477func (txn *Txn) DeleteAll(table, index string, args ...interface{}) (int, error) { 478 if !txn.write { 479 return 0, fmt.Errorf("cannot delete in read-only transaction") 480 } 481 482 // Get all the objects 483 iter, err := txn.Get(table, index, args...) 484 if err != nil { 485 return 0, err 486 } 487 488 // Put them into a slice so there are no safety concerns while actually 489 // performing the deletes 490 var objs []interface{} 491 for { 492 obj := iter.Next() 493 if obj == nil { 494 break 495 } 496 497 objs = append(objs, obj) 498 } 499 500 // Do the deletes 501 num := 0 502 for _, obj := range objs { 503 if err := txn.Delete(table, obj); err != nil { 504 return num, err 505 } 506 num++ 507 } 508 return num, nil 509} 510 511// FirstWatch is used to return the first matching object for 512// the given constraints on the index along with the watch channel 513func (txn *Txn) FirstWatch(table, index string, args ...interface{}) (<-chan struct{}, interface{}, error) { 514 // Get the index value 515 indexSchema, val, err := txn.getIndexValue(table, index, args...) 516 if err != nil { 517 return nil, nil, err 518 } 519 520 // Get the index itself 521 indexTxn := txn.readableIndex(table, indexSchema.Name) 522 523 // Do an exact lookup 524 if indexSchema.Unique && val != nil && indexSchema.Name == index { 525 watch, obj, ok := indexTxn.GetWatch(val) 526 if !ok { 527 return watch, nil, nil 528 } 529 return watch, obj, nil 530 } 531 532 // Handle non-unique index by using an iterator and getting the first value 533 iter := indexTxn.Root().Iterator() 534 watch := iter.SeekPrefixWatch(val) 535 _, value, _ := iter.Next() 536 return watch, value, nil 537} 538 539// LastWatch is used to return the last matching object for 540// the given constraints on the index along with the watch channel 541func (txn *Txn) LastWatch(table, index string, args ...interface{}) (<-chan struct{}, interface{}, error) { 542 // Get the index value 543 indexSchema, val, err := txn.getIndexValue(table, index, args...) 544 if err != nil { 545 return nil, nil, err 546 } 547 548 // Get the index itself 549 indexTxn := txn.readableIndex(table, indexSchema.Name) 550 551 // Do an exact lookup 552 if indexSchema.Unique && val != nil && indexSchema.Name == index { 553 watch, obj, ok := indexTxn.GetWatch(val) 554 if !ok { 555 return watch, nil, nil 556 } 557 return watch, obj, nil 558 } 559 560 // Handle non-unique index by using an iterator and getting the last value 561 iter := indexTxn.Root().ReverseIterator() 562 watch := iter.SeekPrefixWatch(val) 563 _, value, _ := iter.Previous() 564 return watch, value, nil 565} 566 567// First is used to return the first matching object for 568// the given constraints on the index 569func (txn *Txn) First(table, index string, args ...interface{}) (interface{}, error) { 570 _, val, err := txn.FirstWatch(table, index, args...) 571 return val, err 572} 573 574// Last is used to return the last matching object for 575// the given constraints on the index 576func (txn *Txn) Last(table, index string, args ...interface{}) (interface{}, error) { 577 _, val, err := txn.LastWatch(table, index, args...) 578 return val, err 579} 580 581// LongestPrefix is used to fetch the longest prefix match for the given 582// constraints on the index. Note that this will not work with the memdb 583// StringFieldIndex because it adds null terminators which prevent the 584// algorithm from correctly finding a match (it will get to right before the 585// null and fail to find a leaf node). This should only be used where the prefix 586// given is capable of matching indexed entries directly, which typically only 587// applies to a custom indexer. See the unit test for an example. 588func (txn *Txn) LongestPrefix(table, index string, args ...interface{}) (interface{}, error) { 589 // Enforce that this only works on prefix indexes. 590 if !strings.HasSuffix(index, "_prefix") { 591 return nil, fmt.Errorf("must use '%s_prefix' on index", index) 592 } 593 594 // Get the index value. 595 indexSchema, val, err := txn.getIndexValue(table, index, args...) 596 if err != nil { 597 return nil, err 598 } 599 600 // This algorithm only makes sense against a unique index, otherwise the 601 // index keys will have the IDs appended to them. 602 if !indexSchema.Unique { 603 return nil, fmt.Errorf("index '%s' is not unique", index) 604 } 605 606 // Find the longest prefix match with the given index. 607 indexTxn := txn.readableIndex(table, indexSchema.Name) 608 if _, value, ok := indexTxn.Root().LongestPrefix(val); ok { 609 return value, nil 610 } 611 return nil, nil 612} 613 614// getIndexValue is used to get the IndexSchema and the value 615// used to scan the index given the parameters. This handles prefix based 616// scans when the index has the "_prefix" suffix. The index must support 617// prefix iteration. 618func (txn *Txn) getIndexValue(table, index string, args ...interface{}) (*IndexSchema, []byte, error) { 619 // Get the table schema 620 tableSchema, ok := txn.db.schema.Tables[table] 621 if !ok { 622 return nil, nil, fmt.Errorf("invalid table '%s'", table) 623 } 624 625 // Check for a prefix scan 626 prefixScan := false 627 if strings.HasSuffix(index, "_prefix") { 628 index = strings.TrimSuffix(index, "_prefix") 629 prefixScan = true 630 } 631 632 // Get the index schema 633 indexSchema, ok := tableSchema.Indexes[index] 634 if !ok { 635 return nil, nil, fmt.Errorf("invalid index '%s'", index) 636 } 637 638 // Hot-path for when there are no arguments 639 if len(args) == 0 { 640 return indexSchema, nil, nil 641 } 642 643 // Special case the prefix scanning 644 if prefixScan { 645 prefixIndexer, ok := indexSchema.Indexer.(PrefixIndexer) 646 if !ok { 647 return indexSchema, nil, 648 fmt.Errorf("index '%s' does not support prefix scanning", index) 649 } 650 651 val, err := prefixIndexer.PrefixFromArgs(args...) 652 if err != nil { 653 return indexSchema, nil, fmt.Errorf("index error: %v", err) 654 } 655 return indexSchema, val, err 656 } 657 658 // Get the exact match index 659 val, err := indexSchema.Indexer.FromArgs(args...) 660 if err != nil { 661 return indexSchema, nil, fmt.Errorf("index error: %v", err) 662 } 663 return indexSchema, val, err 664} 665 666// ResultIterator is used to iterate over a list of results from a query on a table. 667// 668// When a ResultIterator is created from a write transaction, the results from 669// Next will reflect a snapshot of the table at the time the ResultIterator is 670// created. 671// This means that calling Insert or Delete on a transaction while iterating is 672// allowed, but the changes made by Insert or Delete will not be observed in the 673// results returned from subsequent calls to Next. For example if an item is deleted 674// from the index used by the iterator it will still be returned by Next. If an 675// item is inserted into the index used by the iterator, it will not be returned 676// by Next. However, an iterator created after a call to Insert or Delete will 677// reflect the modifications. 678// 679// When a ResultIterator is created from a write transaction, and there are already 680// modifications to the index used by the iterator, the modification cache of the 681// index will be invalidated. This may result in some additional allocations if 682// the same node in the index is modified again. 683type ResultIterator interface { 684 WatchCh() <-chan struct{} 685 // Next returns the next result from the iterator. If there are no more results 686 // nil is returned. 687 Next() interface{} 688} 689 690// Get is used to construct a ResultIterator over all the rows that match the 691// given constraints of an index. 692// 693// See the documentation for ResultIterator to understand the behaviour of the 694// returned ResultIterator. 695func (txn *Txn) Get(table, index string, args ...interface{}) (ResultIterator, error) { 696 indexIter, val, err := txn.getIndexIterator(table, index, args...) 697 if err != nil { 698 return nil, err 699 } 700 701 // Seek the iterator to the appropriate sub-set 702 watchCh := indexIter.SeekPrefixWatch(val) 703 704 // Create an iterator 705 iter := &radixIterator{ 706 iter: indexIter, 707 watchCh: watchCh, 708 } 709 return iter, nil 710} 711 712// GetReverse is used to construct a Reverse ResultIterator over all the 713// rows that match the given constraints of an index. 714// The returned ResultIterator's Next() will return the next Previous value. 715// 716// See the documentation for ResultIterator to understand the behaviour of the 717// returned ResultIterator. 718func (txn *Txn) GetReverse(table, index string, args ...interface{}) (ResultIterator, error) { 719 indexIter, val, err := txn.getIndexIteratorReverse(table, index, args...) 720 if err != nil { 721 return nil, err 722 } 723 724 // Seek the iterator to the appropriate sub-set 725 watchCh := indexIter.SeekPrefixWatch(val) 726 727 // Create an iterator 728 iter := &radixReverseIterator{ 729 iter: indexIter, 730 watchCh: watchCh, 731 } 732 return iter, nil 733} 734 735// LowerBound is used to construct a ResultIterator over all the the range of 736// rows that have an index value greater than or equal to the provide args. 737// Calling this then iterating until the rows are larger than required allows 738// range scans within an index. It is not possible to watch the resulting 739// iterator since the radix tree doesn't efficiently allow watching on lower 740// bound changes. The WatchCh returned will be nill and so will block forever. 741// 742// See the documentation for ResultIterator to understand the behaviour of the 743// returned ResultIterator. 744func (txn *Txn) LowerBound(table, index string, args ...interface{}) (ResultIterator, error) { 745 indexIter, val, err := txn.getIndexIterator(table, index, args...) 746 if err != nil { 747 return nil, err 748 } 749 750 // Seek the iterator to the appropriate sub-set 751 indexIter.SeekLowerBound(val) 752 753 // Create an iterator 754 iter := &radixIterator{ 755 iter: indexIter, 756 } 757 return iter, nil 758} 759 760// ReverseLowerBound is used to construct a Reverse ResultIterator over all the 761// the range of rows that have an index value less than or equal to the 762// provide args. Calling this then iterating until the rows are lower than 763// required allows range scans within an index. It is not possible to watch the 764// resulting iterator since the radix tree doesn't efficiently allow watching 765// on lower bound changes. The WatchCh returned will be nill and so will block 766// forever. 767// 768// See the documentation for ResultIterator to understand the behaviour of the 769// returned ResultIterator. 770func (txn *Txn) ReverseLowerBound(table, index string, args ...interface{}) (ResultIterator, error) { 771 indexIter, val, err := txn.getIndexIteratorReverse(table, index, args...) 772 if err != nil { 773 return nil, err 774 } 775 776 // Seek the iterator to the appropriate sub-set 777 indexIter.SeekReverseLowerBound(val) 778 779 // Create an iterator 780 iter := &radixReverseIterator{ 781 iter: indexIter, 782 } 783 return iter, nil 784} 785 786// objectID is a tuple of table name and the raw internal id byte slice 787// converted to a string. It's only converted to a string to make it comparable 788// so this struct can be used as a map index. 789type objectID struct { 790 Table string 791 IndexVal string 792} 793 794// mutInfo stores metadata about mutations to allow collapsing multiple 795// mutations to the same object into one. 796type mutInfo struct { 797 firstBefore interface{} 798 lastIdx int 799} 800 801// Changes returns the set of object changes that have been made in the 802// transaction so far. If change tracking is not enabled it wil always return 803// nil. It can be called before or after Commit. If it is before Commit it will 804// return all changes made so far which may not be the same as the final 805// Changes. After abort it will always return nil. As with other Txn methods 806// it's not safe to call this from a different goroutine than the one making 807// mutations or committing the transaction. Mutations will appear in the order 808// they were performed in the transaction but multiple operations to the same 809// object will be collapsed so only the effective overall change to that object 810// is present. If transaction operations are dependent (e.g. copy object X to Y 811// then delete X) this might mean the set of mutations is incomplete to verify 812// history, but it is complete in that the net effect is preserved (Y got a new 813// value, X got removed). 814func (txn *Txn) Changes() Changes { 815 if txn.changes == nil { 816 return nil 817 } 818 819 // De-duplicate mutations by key so all take effect at the point of the last 820 // write but we keep the mutations in order. 821 dups := make(map[objectID]mutInfo) 822 for i, m := range txn.changes { 823 oid := objectID{ 824 Table: m.Table, 825 IndexVal: string(m.primaryKey), 826 } 827 // Store the latest mutation index for each key value 828 mi, ok := dups[oid] 829 if !ok { 830 // First entry for key, store the before value 831 mi.firstBefore = m.Before 832 } 833 mi.lastIdx = i 834 dups[oid] = mi 835 } 836 if len(dups) == len(txn.changes) { 837 // No duplicates found, fast path return it as is 838 return txn.changes 839 } 840 841 // Need to remove the duplicates 842 cs := make(Changes, 0, len(dups)) 843 for i, m := range txn.changes { 844 oid := objectID{ 845 Table: m.Table, 846 IndexVal: string(m.primaryKey), 847 } 848 mi := dups[oid] 849 if mi.lastIdx == i { 850 // This was the latest value for this key copy it with the before value in 851 // case it's different. Note that m is not a pointer so we are not 852 // modifying the txn.changeSet here - it's already a copy. 853 m.Before = mi.firstBefore 854 855 // Edge case - if the object was inserted and then eventually deleted in 856 // the same transaction, then the net affect on that key is a no-op. Don't 857 // emit a mutation with nil for before and after as it's meaningless and 858 // might violate expectations and cause a panic in code that assumes at 859 // least one must be set. 860 if m.Before == nil && m.After == nil { 861 continue 862 } 863 cs = append(cs, m) 864 } 865 } 866 // Store the de-duped version in case this is called again 867 txn.changes = cs 868 return cs 869} 870 871func (txn *Txn) getIndexIterator(table, index string, args ...interface{}) (*iradix.Iterator, []byte, error) { 872 // Get the index value to scan 873 indexSchema, val, err := txn.getIndexValue(table, index, args...) 874 if err != nil { 875 return nil, nil, err 876 } 877 878 // Get the index itself 879 indexTxn := txn.readableIndex(table, indexSchema.Name) 880 indexRoot := indexTxn.Root() 881 882 // Get an iterator over the index 883 indexIter := indexRoot.Iterator() 884 return indexIter, val, nil 885} 886 887func (txn *Txn) getIndexIteratorReverse(table, index string, args ...interface{}) (*iradix.ReverseIterator, []byte, error) { 888 // Get the index value to scan 889 indexSchema, val, err := txn.getIndexValue(table, index, args...) 890 if err != nil { 891 return nil, nil, err 892 } 893 894 // Get the index itself 895 indexTxn := txn.readableIndex(table, indexSchema.Name) 896 indexRoot := indexTxn.Root() 897 898 // Get an interator over the index 899 indexIter := indexRoot.ReverseIterator() 900 return indexIter, val, nil 901} 902 903// Defer is used to push a new arbitrary function onto a stack which 904// gets called when a transaction is committed and finished. Deferred 905// functions are called in LIFO order, and only invoked at the end of 906// write transactions. 907func (txn *Txn) Defer(fn func()) { 908 txn.after = append(txn.after, fn) 909} 910 911// radixIterator is used to wrap an underlying iradix iterator. 912// This is much more efficient than a sliceIterator as we are not 913// materializing the entire view. 914type radixIterator struct { 915 iter *iradix.Iterator 916 watchCh <-chan struct{} 917} 918 919func (r *radixIterator) WatchCh() <-chan struct{} { 920 return r.watchCh 921} 922 923func (r *radixIterator) Next() interface{} { 924 _, value, ok := r.iter.Next() 925 if !ok { 926 return nil 927 } 928 return value 929} 930 931type radixReverseIterator struct { 932 iter *iradix.ReverseIterator 933 watchCh <-chan struct{} 934} 935 936func (r *radixReverseIterator) Next() interface{} { 937 _, value, ok := r.iter.Previous() 938 if !ok { 939 return nil 940 } 941 return value 942} 943 944func (r *radixReverseIterator) WatchCh() <-chan struct{} { 945 return r.watchCh 946} 947 948// Snapshot creates a snapshot of the current state of the transaction. 949// Returns a new read-only transaction or nil if the transaction is already 950// aborted or committed. 951func (txn *Txn) Snapshot() *Txn { 952 if txn.rootTxn == nil { 953 return nil 954 } 955 956 snapshot := &Txn{ 957 db: txn.db, 958 rootTxn: txn.rootTxn.Clone(), 959 } 960 961 // Commit sub-transactions into the snapshot 962 for key, subTxn := range txn.modified { 963 path := indexPath(key.Table, key.Index) 964 final := subTxn.CommitOnly() 965 snapshot.rootTxn.Insert(path, final) 966 } 967 968 return snapshot 969} 970