1// Copyright (c) 2016, Suryandaru Triandana <syndtr@gmail.com>
2// All rights reserved.
3//
4// Use of this source code is governed by a BSD-style license that can be
5// found in the LICENSE file.
6
7package leveldb
8
9import (
10	"errors"
11	"sync"
12	"time"
13
14	"github.com/syndtr/goleveldb/leveldb/iterator"
15	"github.com/syndtr/goleveldb/leveldb/opt"
16	"github.com/syndtr/goleveldb/leveldb/util"
17)
18
19var errTransactionDone = errors.New("leveldb: transaction already closed")
20
21// Transaction is the transaction handle.
22type Transaction struct {
23	db        *DB
24	lk        sync.RWMutex
25	seq       uint64
26	mem       *memDB
27	tables    tFiles
28	ikScratch []byte
29	rec       sessionRecord
30	stats     cStatStaging
31	closed    bool
32}
33
34// Get gets the value for the given key. It returns ErrNotFound if the
35// DB does not contains the key.
36//
37// The returned slice is its own copy, it is safe to modify the contents
38// of the returned slice.
39// It is safe to modify the contents of the argument after Get returns.
40func (tr *Transaction) Get(key []byte, ro *opt.ReadOptions) ([]byte, error) {
41	tr.lk.RLock()
42	defer tr.lk.RUnlock()
43	if tr.closed {
44		return nil, errTransactionDone
45	}
46	return tr.db.get(tr.mem.DB, tr.tables, key, tr.seq, ro)
47}
48
49// Has returns true if the DB does contains the given key.
50//
51// It is safe to modify the contents of the argument after Has returns.
52func (tr *Transaction) Has(key []byte, ro *opt.ReadOptions) (bool, error) {
53	tr.lk.RLock()
54	defer tr.lk.RUnlock()
55	if tr.closed {
56		return false, errTransactionDone
57	}
58	return tr.db.has(tr.mem.DB, tr.tables, key, tr.seq, ro)
59}
60
61// NewIterator returns an iterator for the latest snapshot of the transaction.
62// The returned iterator is not safe for concurrent use, but it is safe to use
63// multiple iterators concurrently, with each in a dedicated goroutine.
64// It is also safe to use an iterator concurrently while writes to the
65// transaction. The resultant key/value pairs are guaranteed to be consistent.
66//
67// Slice allows slicing the iterator to only contains keys in the given
68// range. A nil Range.Start is treated as a key before all keys in the
69// DB. And a nil Range.Limit is treated as a key after all keys in
70// the DB.
71//
72// The returned iterator has locks on its own resources, so it can live beyond
73// the lifetime of the transaction who creates them.
74//
75// WARNING: Any slice returned by interator (e.g. slice returned by calling
76// Iterator.Key() or Iterator.Key() methods), its content should not be modified
77// unless noted otherwise.
78//
79// The iterator must be released after use, by calling Release method.
80//
81// Also read Iterator documentation of the leveldb/iterator package.
82func (tr *Transaction) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
83	tr.lk.RLock()
84	defer tr.lk.RUnlock()
85	if tr.closed {
86		return iterator.NewEmptyIterator(errTransactionDone)
87	}
88	tr.mem.incref()
89	return tr.db.newIterator(tr.mem, tr.tables, tr.seq, slice, ro)
90}
91
92func (tr *Transaction) flush() error {
93	// Flush memdb.
94	if tr.mem.Len() != 0 {
95		tr.stats.startTimer()
96		iter := tr.mem.NewIterator(nil)
97		t, n, err := tr.db.s.tops.createFrom(iter)
98		iter.Release()
99		tr.stats.stopTimer()
100		if err != nil {
101			return err
102		}
103		if tr.mem.getref() == 1 {
104			tr.mem.Reset()
105		} else {
106			tr.mem.decref()
107			tr.mem = tr.db.mpoolGet(0)
108			tr.mem.incref()
109		}
110		tr.tables = append(tr.tables, t)
111		tr.rec.addTableFile(0, t)
112		tr.stats.write += t.size
113		tr.db.logf("transaction@flush created L0@%d N·%d S·%s %q:%q", t.fd.Num, n, shortenb(int(t.size)), t.imin, t.imax)
114	}
115	return nil
116}
117
118func (tr *Transaction) put(kt keyType, key, value []byte) error {
119	tr.ikScratch = makeInternalKey(tr.ikScratch, key, tr.seq+1, kt)
120	if tr.mem.Free() < len(tr.ikScratch)+len(value) {
121		if err := tr.flush(); err != nil {
122			return err
123		}
124	}
125	if err := tr.mem.Put(tr.ikScratch, value); err != nil {
126		return err
127	}
128	tr.seq++
129	return nil
130}
131
132// Put sets the value for the given key. It overwrites any previous value
133// for that key; a DB is not a multi-map.
134// Please note that the transaction is not compacted until committed, so if you
135// writes 10 same keys, then those 10 same keys are in the transaction.
136//
137// It is safe to modify the contents of the arguments after Put returns.
138func (tr *Transaction) Put(key, value []byte, wo *opt.WriteOptions) error {
139	tr.lk.Lock()
140	defer tr.lk.Unlock()
141	if tr.closed {
142		return errTransactionDone
143	}
144	return tr.put(keyTypeVal, key, value)
145}
146
147// Delete deletes the value for the given key.
148// Please note that the transaction is not compacted until committed, so if you
149// writes 10 same keys, then those 10 same keys are in the transaction.
150//
151// It is safe to modify the contents of the arguments after Delete returns.
152func (tr *Transaction) Delete(key []byte, wo *opt.WriteOptions) error {
153	tr.lk.Lock()
154	defer tr.lk.Unlock()
155	if tr.closed {
156		return errTransactionDone
157	}
158	return tr.put(keyTypeDel, key, nil)
159}
160
161// Write apply the given batch to the transaction. The batch will be applied
162// sequentially.
163// Please note that the transaction is not compacted until committed, so if you
164// writes 10 same keys, then those 10 same keys are in the transaction.
165//
166// It is safe to modify the contents of the arguments after Write returns.
167func (tr *Transaction) Write(b *Batch, wo *opt.WriteOptions) error {
168	if b == nil || b.Len() == 0 {
169		return nil
170	}
171
172	tr.lk.Lock()
173	defer tr.lk.Unlock()
174	if tr.closed {
175		return errTransactionDone
176	}
177	return b.replayInternal(func(i int, kt keyType, k, v []byte) error {
178		return tr.put(kt, k, v)
179	})
180}
181
182func (tr *Transaction) setDone() {
183	tr.closed = true
184	tr.db.tr = nil
185	tr.mem.decref()
186	<-tr.db.writeLockC
187}
188
189// Commit commits the transaction. If error is not nil, then the transaction is
190// not committed, it can then either be retried or discarded.
191//
192// Other methods should not be called after transaction has been committed.
193func (tr *Transaction) Commit() error {
194	if err := tr.db.ok(); err != nil {
195		return err
196	}
197
198	tr.lk.Lock()
199	defer tr.lk.Unlock()
200	if tr.closed {
201		return errTransactionDone
202	}
203	if err := tr.flush(); err != nil {
204		// Return error, lets user decide either to retry or discard
205		// transaction.
206		return err
207	}
208	if len(tr.tables) != 0 {
209		// Committing transaction.
210		tr.rec.setSeqNum(tr.seq)
211		tr.db.compCommitLk.Lock()
212		tr.stats.startTimer()
213		var cerr error
214		for retry := 0; retry < 3; retry++ {
215			cerr = tr.db.s.commit(&tr.rec, false)
216			if cerr != nil {
217				tr.db.logf("transaction@commit error R·%d %q", retry, cerr)
218				select {
219				case <-time.After(time.Second):
220				case <-tr.db.closeC:
221					tr.db.logf("transaction@commit exiting")
222					tr.db.compCommitLk.Unlock()
223					return cerr
224				}
225			} else {
226				// Success. Set db.seq.
227				tr.db.setSeq(tr.seq)
228				break
229			}
230		}
231		tr.stats.stopTimer()
232		if cerr != nil {
233			// Return error, lets user decide either to retry or discard
234			// transaction.
235			return cerr
236		}
237
238		// Update compaction stats. This is safe as long as we hold compCommitLk.
239		tr.db.compStats.addStat(0, &tr.stats)
240
241		// Trigger table auto-compaction.
242		tr.db.compTrigger(tr.db.tcompCmdC)
243		tr.db.compCommitLk.Unlock()
244
245		// Additionally, wait compaction when certain threshold reached.
246		// Ignore error, returns error only if transaction can't be committed.
247		tr.db.waitCompaction()
248	}
249	// Only mark as done if transaction committed successfully.
250	tr.setDone()
251	return nil
252}
253
254func (tr *Transaction) discard() {
255	// Discard transaction.
256	for _, t := range tr.tables {
257		tr.db.logf("transaction@discard @%d", t.fd.Num)
258		// Iterator may still use the table, so we use tOps.remove here.
259		tr.db.s.tops.remove(t.fd)
260	}
261}
262
263// Discard discards the transaction.
264// This method is noop if transaction is already closed (either committed or
265// discarded)
266//
267// Other methods should not be called after transaction has been discarded.
268func (tr *Transaction) Discard() {
269	tr.lk.Lock()
270	if !tr.closed {
271		tr.discard()
272		tr.setDone()
273	}
274	tr.lk.Unlock()
275}
276
277func (db *DB) waitCompaction() error {
278	if db.s.tLen(0) >= db.s.o.GetWriteL0PauseTrigger() {
279		return db.compTriggerWait(db.tcompCmdC)
280	}
281	return nil
282}
283
284// OpenTransaction opens an atomic DB transaction. Only one transaction can be
285// opened at a time. Subsequent call to Write and OpenTransaction will be blocked
286// until in-flight transaction is committed or discarded.
287// The returned transaction handle is safe for concurrent use.
288//
289// Transaction is very expensive and can overwhelm compaction, especially if
290// transaction size is small. Use with caution.
291// The rule of thumb is if you need to merge at least same amount of
292// `Options.WriteBuffer` worth of data then use transaction, otherwise don't.
293//
294// The transaction must be closed once done, either by committing or discarding
295// the transaction.
296// Closing the DB will discard open transaction.
297func (db *DB) OpenTransaction() (*Transaction, error) {
298	if err := db.ok(); err != nil {
299		return nil, err
300	}
301
302	// The write happen synchronously.
303	select {
304	case db.writeLockC <- struct{}{}:
305	case err := <-db.compPerErrC:
306		return nil, err
307	case <-db.closeC:
308		return nil, ErrClosed
309	}
310
311	if db.tr != nil {
312		panic("leveldb: has open transaction")
313	}
314
315	// Flush current memdb.
316	if db.mem != nil && db.mem.Len() != 0 {
317		if _, err := db.rotateMem(0, true); err != nil {
318			return nil, err
319		}
320	}
321
322	// Wait compaction when certain threshold reached.
323	if err := db.waitCompaction(); err != nil {
324		return nil, err
325	}
326
327	tr := &Transaction{
328		db:  db,
329		seq: db.seq,
330		mem: db.mpoolGet(0),
331	}
332	tr.mem.incref()
333	db.tr = tr
334	return tr, nil
335}
336