1// Copyright (c) 2016, Suryandaru Triandana <syndtr@gmail.com> 2// All rights reserved. 3// 4// Use of this source code is governed by a BSD-style license that can be 5// found in the LICENSE file. 6 7package leveldb 8 9import ( 10 "errors" 11 "sync" 12 "time" 13 14 "github.com/syndtr/goleveldb/leveldb/iterator" 15 "github.com/syndtr/goleveldb/leveldb/opt" 16 "github.com/syndtr/goleveldb/leveldb/util" 17) 18 19var errTransactionDone = errors.New("leveldb: transaction already closed") 20 21// Transaction is the transaction handle. 22type Transaction struct { 23 db *DB 24 lk sync.RWMutex 25 seq uint64 26 mem *memDB 27 tables tFiles 28 ikScratch []byte 29 rec sessionRecord 30 stats cStatStaging 31 closed bool 32} 33 34// Get gets the value for the given key. It returns ErrNotFound if the 35// DB does not contains the key. 36// 37// The returned slice is its own copy, it is safe to modify the contents 38// of the returned slice. 39// It is safe to modify the contents of the argument after Get returns. 40func (tr *Transaction) Get(key []byte, ro *opt.ReadOptions) ([]byte, error) { 41 tr.lk.RLock() 42 defer tr.lk.RUnlock() 43 if tr.closed { 44 return nil, errTransactionDone 45 } 46 return tr.db.get(tr.mem.DB, tr.tables, key, tr.seq, ro) 47} 48 49// Has returns true if the DB does contains the given key. 50// 51// It is safe to modify the contents of the argument after Has returns. 52func (tr *Transaction) Has(key []byte, ro *opt.ReadOptions) (bool, error) { 53 tr.lk.RLock() 54 defer tr.lk.RUnlock() 55 if tr.closed { 56 return false, errTransactionDone 57 } 58 return tr.db.has(tr.mem.DB, tr.tables, key, tr.seq, ro) 59} 60 61// NewIterator returns an iterator for the latest snapshot of the transaction. 62// The returned iterator is not safe for concurrent use, but it is safe to use 63// multiple iterators concurrently, with each in a dedicated goroutine. 64// It is also safe to use an iterator concurrently while writes to the 65// transaction. The resultant key/value pairs are guaranteed to be consistent. 66// 67// Slice allows slicing the iterator to only contains keys in the given 68// range. A nil Range.Start is treated as a key before all keys in the 69// DB. And a nil Range.Limit is treated as a key after all keys in 70// the DB. 71// 72// WARNING: Any slice returned by interator (e.g. slice returned by calling 73// Iterator.Key() or Iterator.Key() methods), its content should not be modified 74// unless noted otherwise. 75// 76// The iterator must be released after use, by calling Release method. 77// 78// Also read Iterator documentation of the leveldb/iterator package. 79func (tr *Transaction) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { 80 tr.lk.RLock() 81 defer tr.lk.RUnlock() 82 if tr.closed { 83 return iterator.NewEmptyIterator(errTransactionDone) 84 } 85 tr.mem.incref() 86 return tr.db.newIterator(tr.mem, tr.tables, tr.seq, slice, ro) 87} 88 89func (tr *Transaction) flush() error { 90 // Flush memdb. 91 if tr.mem.Len() != 0 { 92 tr.stats.startTimer() 93 iter := tr.mem.NewIterator(nil) 94 t, n, err := tr.db.s.tops.createFrom(iter) 95 iter.Release() 96 tr.stats.stopTimer() 97 if err != nil { 98 return err 99 } 100 if tr.mem.getref() == 1 { 101 tr.mem.Reset() 102 } else { 103 tr.mem.decref() 104 tr.mem = tr.db.mpoolGet(0) 105 tr.mem.incref() 106 } 107 tr.tables = append(tr.tables, t) 108 tr.rec.addTableFile(0, t) 109 tr.stats.write += t.size 110 tr.db.logf("transaction@flush created L0@%d N·%d S·%s %q:%q", t.fd.Num, n, shortenb(int(t.size)), t.imin, t.imax) 111 } 112 return nil 113} 114 115func (tr *Transaction) put(kt keyType, key, value []byte) error { 116 tr.ikScratch = makeInternalKey(tr.ikScratch, key, tr.seq+1, kt) 117 if tr.mem.Free() < len(tr.ikScratch)+len(value) { 118 if err := tr.flush(); err != nil { 119 return err 120 } 121 } 122 if err := tr.mem.Put(tr.ikScratch, value); err != nil { 123 return err 124 } 125 tr.seq++ 126 return nil 127} 128 129// Put sets the value for the given key. It overwrites any previous value 130// for that key; a DB is not a multi-map. 131// Please note that the transaction is not compacted until committed, so if you 132// writes 10 same keys, then those 10 same keys are in the transaction. 133// 134// It is safe to modify the contents of the arguments after Put returns. 135func (tr *Transaction) Put(key, value []byte, wo *opt.WriteOptions) error { 136 tr.lk.Lock() 137 defer tr.lk.Unlock() 138 if tr.closed { 139 return errTransactionDone 140 } 141 return tr.put(keyTypeVal, key, value) 142} 143 144// Delete deletes the value for the given key. 145// Please note that the transaction is not compacted until committed, so if you 146// writes 10 same keys, then those 10 same keys are in the transaction. 147// 148// It is safe to modify the contents of the arguments after Delete returns. 149func (tr *Transaction) Delete(key []byte, wo *opt.WriteOptions) error { 150 tr.lk.Lock() 151 defer tr.lk.Unlock() 152 if tr.closed { 153 return errTransactionDone 154 } 155 return tr.put(keyTypeDel, key, nil) 156} 157 158// Write apply the given batch to the transaction. The batch will be applied 159// sequentially. 160// Please note that the transaction is not compacted until committed, so if you 161// writes 10 same keys, then those 10 same keys are in the transaction. 162// 163// It is safe to modify the contents of the arguments after Write returns. 164func (tr *Transaction) Write(b *Batch, wo *opt.WriteOptions) error { 165 if b == nil || b.Len() == 0 { 166 return nil 167 } 168 169 tr.lk.Lock() 170 defer tr.lk.Unlock() 171 if tr.closed { 172 return errTransactionDone 173 } 174 return b.replayInternal(func(i int, kt keyType, k, v []byte) error { 175 return tr.put(kt, k, v) 176 }) 177} 178 179func (tr *Transaction) setDone() { 180 tr.closed = true 181 tr.db.tr = nil 182 tr.mem.decref() 183 <-tr.db.writeLockC 184} 185 186// Commit commits the transaction. If error is not nil, then the transaction is 187// not committed, it can then either be retried or discarded. 188// 189// Other methods should not be called after transaction has been committed. 190func (tr *Transaction) Commit() error { 191 if err := tr.db.ok(); err != nil { 192 return err 193 } 194 195 tr.lk.Lock() 196 defer tr.lk.Unlock() 197 if tr.closed { 198 return errTransactionDone 199 } 200 if err := tr.flush(); err != nil { 201 // Return error, lets user decide either to retry or discard 202 // transaction. 203 return err 204 } 205 if len(tr.tables) != 0 { 206 // Committing transaction. 207 tr.rec.setSeqNum(tr.seq) 208 tr.db.compCommitLk.Lock() 209 tr.stats.startTimer() 210 var cerr error 211 for retry := 0; retry < 3; retry++ { 212 cerr = tr.db.s.commit(&tr.rec) 213 if cerr != nil { 214 tr.db.logf("transaction@commit error R·%d %q", retry, cerr) 215 select { 216 case <-time.After(time.Second): 217 case <-tr.db.closeC: 218 tr.db.logf("transaction@commit exiting") 219 tr.db.compCommitLk.Unlock() 220 return cerr 221 } 222 } else { 223 // Success. Set db.seq. 224 tr.db.setSeq(tr.seq) 225 break 226 } 227 } 228 tr.stats.stopTimer() 229 if cerr != nil { 230 // Return error, lets user decide either to retry or discard 231 // transaction. 232 return cerr 233 } 234 235 // Update compaction stats. This is safe as long as we hold compCommitLk. 236 tr.db.compStats.addStat(0, &tr.stats) 237 238 // Trigger table auto-compaction. 239 tr.db.compTrigger(tr.db.tcompCmdC) 240 tr.db.compCommitLk.Unlock() 241 242 // Additionally, wait compaction when certain threshold reached. 243 // Ignore error, returns error only if transaction can't be committed. 244 tr.db.waitCompaction() 245 } 246 // Only mark as done if transaction committed successfully. 247 tr.setDone() 248 return nil 249} 250 251func (tr *Transaction) discard() { 252 // Discard transaction. 253 for _, t := range tr.tables { 254 tr.db.logf("transaction@discard @%d", t.fd.Num) 255 if err1 := tr.db.s.stor.Remove(t.fd); err1 == nil { 256 tr.db.s.reuseFileNum(t.fd.Num) 257 } 258 } 259} 260 261// Discard discards the transaction. 262// 263// Other methods should not be called after transaction has been discarded. 264func (tr *Transaction) Discard() { 265 tr.lk.Lock() 266 if !tr.closed { 267 tr.discard() 268 tr.setDone() 269 } 270 tr.lk.Unlock() 271} 272 273func (db *DB) waitCompaction() error { 274 if db.s.tLen(0) >= db.s.o.GetWriteL0PauseTrigger() { 275 return db.compTriggerWait(db.tcompCmdC) 276 } 277 return nil 278} 279 280// OpenTransaction opens an atomic DB transaction. Only one transaction can be 281// opened at a time. Subsequent call to Write and OpenTransaction will be blocked 282// until in-flight transaction is committed or discarded. 283// The returned transaction handle is safe for concurrent use. 284// 285// Transaction is expensive and can overwhelm compaction, especially if 286// transaction size is small. Use with caution. 287// 288// The transaction must be closed once done, either by committing or discarding 289// the transaction. 290// Closing the DB will discard open transaction. 291func (db *DB) OpenTransaction() (*Transaction, error) { 292 if err := db.ok(); err != nil { 293 return nil, err 294 } 295 296 // The write happen synchronously. 297 select { 298 case db.writeLockC <- struct{}{}: 299 case err := <-db.compPerErrC: 300 return nil, err 301 case <-db.closeC: 302 return nil, ErrClosed 303 } 304 305 if db.tr != nil { 306 panic("leveldb: has open transaction") 307 } 308 309 // Flush current memdb. 310 if db.mem != nil && db.mem.Len() != 0 { 311 if _, err := db.rotateMem(0, true); err != nil { 312 return nil, err 313 } 314 } 315 316 // Wait compaction when certain threshold reached. 317 if err := db.waitCompaction(); err != nil { 318 return nil, err 319 } 320 321 tr := &Transaction{ 322 db: db, 323 seq: db.seq, 324 mem: db.mpoolGet(0), 325 } 326 tr.mem.incref() 327 db.tr = tr 328 return tr, nil 329} 330