1// Copyright (c) 2016, Suryandaru Triandana <syndtr@gmail.com>
2// All rights reserved.
3//
4// Use of this source code is governed by a BSD-style license that can be
5// found in the LICENSE file.
6
7package leveldb
8
9import (
10	"errors"
11	"sync"
12	"time"
13
14	"github.com/syndtr/goleveldb/leveldb/iterator"
15	"github.com/syndtr/goleveldb/leveldb/opt"
16	"github.com/syndtr/goleveldb/leveldb/util"
17)
18
19var errTransactionDone = errors.New("leveldb: transaction already closed")
20
21// Transaction is the transaction handle.
22type Transaction struct {
23	db        *DB
24	lk        sync.RWMutex
25	seq       uint64
26	mem       *memDB
27	tables    tFiles
28	ikScratch []byte
29	rec       sessionRecord
30	stats     cStatStaging
31	closed    bool
32}
33
34// Get gets the value for the given key. It returns ErrNotFound if the
35// DB does not contains the key.
36//
37// The returned slice is its own copy, it is safe to modify the contents
38// of the returned slice.
39// It is safe to modify the contents of the argument after Get returns.
40func (tr *Transaction) Get(key []byte, ro *opt.ReadOptions) ([]byte, error) {
41	tr.lk.RLock()
42	defer tr.lk.RUnlock()
43	if tr.closed {
44		return nil, errTransactionDone
45	}
46	return tr.db.get(tr.mem.DB, tr.tables, key, tr.seq, ro)
47}
48
49// Has returns true if the DB does contains the given key.
50//
51// It is safe to modify the contents of the argument after Has returns.
52func (tr *Transaction) Has(key []byte, ro *opt.ReadOptions) (bool, error) {
53	tr.lk.RLock()
54	defer tr.lk.RUnlock()
55	if tr.closed {
56		return false, errTransactionDone
57	}
58	return tr.db.has(tr.mem.DB, tr.tables, key, tr.seq, ro)
59}
60
61// NewIterator returns an iterator for the latest snapshot of the transaction.
62// The returned iterator is not safe for concurrent use, but it is safe to use
63// multiple iterators concurrently, with each in a dedicated goroutine.
64// It is also safe to use an iterator concurrently while writes to the
65// transaction. The resultant key/value pairs are guaranteed to be consistent.
66//
67// Slice allows slicing the iterator to only contains keys in the given
68// range. A nil Range.Start is treated as a key before all keys in the
69// DB. And a nil Range.Limit is treated as a key after all keys in
70// the DB.
71//
72// WARNING: Any slice returned by interator (e.g. slice returned by calling
73// Iterator.Key() or Iterator.Key() methods), its content should not be modified
74// unless noted otherwise.
75//
76// The iterator must be released after use, by calling Release method.
77//
78// Also read Iterator documentation of the leveldb/iterator package.
79func (tr *Transaction) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
80	tr.lk.RLock()
81	defer tr.lk.RUnlock()
82	if tr.closed {
83		return iterator.NewEmptyIterator(errTransactionDone)
84	}
85	tr.mem.incref()
86	return tr.db.newIterator(tr.mem, tr.tables, tr.seq, slice, ro)
87}
88
89func (tr *Transaction) flush() error {
90	// Flush memdb.
91	if tr.mem.Len() != 0 {
92		tr.stats.startTimer()
93		iter := tr.mem.NewIterator(nil)
94		t, n, err := tr.db.s.tops.createFrom(iter)
95		iter.Release()
96		tr.stats.stopTimer()
97		if err != nil {
98			return err
99		}
100		if tr.mem.getref() == 1 {
101			tr.mem.Reset()
102		} else {
103			tr.mem.decref()
104			tr.mem = tr.db.mpoolGet(0)
105			tr.mem.incref()
106		}
107		tr.tables = append(tr.tables, t)
108		tr.rec.addTableFile(0, t)
109		tr.stats.write += t.size
110		tr.db.logf("transaction@flush created L0@%d N·%d S·%s %q:%q", t.fd.Num, n, shortenb(int(t.size)), t.imin, t.imax)
111	}
112	return nil
113}
114
115func (tr *Transaction) put(kt keyType, key, value []byte) error {
116	tr.ikScratch = makeInternalKey(tr.ikScratch, key, tr.seq+1, kt)
117	if tr.mem.Free() < len(tr.ikScratch)+len(value) {
118		if err := tr.flush(); err != nil {
119			return err
120		}
121	}
122	if err := tr.mem.Put(tr.ikScratch, value); err != nil {
123		return err
124	}
125	tr.seq++
126	return nil
127}
128
129// Put sets the value for the given key. It overwrites any previous value
130// for that key; a DB is not a multi-map.
131// Please note that the transaction is not compacted until committed, so if you
132// writes 10 same keys, then those 10 same keys are in the transaction.
133//
134// It is safe to modify the contents of the arguments after Put returns.
135func (tr *Transaction) Put(key, value []byte, wo *opt.WriteOptions) error {
136	tr.lk.Lock()
137	defer tr.lk.Unlock()
138	if tr.closed {
139		return errTransactionDone
140	}
141	return tr.put(keyTypeVal, key, value)
142}
143
144// Delete deletes the value for the given key.
145// Please note that the transaction is not compacted until committed, so if you
146// writes 10 same keys, then those 10 same keys are in the transaction.
147//
148// It is safe to modify the contents of the arguments after Delete returns.
149func (tr *Transaction) Delete(key []byte, wo *opt.WriteOptions) error {
150	tr.lk.Lock()
151	defer tr.lk.Unlock()
152	if tr.closed {
153		return errTransactionDone
154	}
155	return tr.put(keyTypeDel, key, nil)
156}
157
158// Write apply the given batch to the transaction. The batch will be applied
159// sequentially.
160// Please note that the transaction is not compacted until committed, so if you
161// writes 10 same keys, then those 10 same keys are in the transaction.
162//
163// It is safe to modify the contents of the arguments after Write returns.
164func (tr *Transaction) Write(b *Batch, wo *opt.WriteOptions) error {
165	if b == nil || b.Len() == 0 {
166		return nil
167	}
168
169	tr.lk.Lock()
170	defer tr.lk.Unlock()
171	if tr.closed {
172		return errTransactionDone
173	}
174	return b.replayInternal(func(i int, kt keyType, k, v []byte) error {
175		return tr.put(kt, k, v)
176	})
177}
178
179func (tr *Transaction) setDone() {
180	tr.closed = true
181	tr.db.tr = nil
182	tr.mem.decref()
183	<-tr.db.writeLockC
184}
185
186// Commit commits the transaction. If error is not nil, then the transaction is
187// not committed, it can then either be retried or discarded.
188//
189// Other methods should not be called after transaction has been committed.
190func (tr *Transaction) Commit() error {
191	if err := tr.db.ok(); err != nil {
192		return err
193	}
194
195	tr.lk.Lock()
196	defer tr.lk.Unlock()
197	if tr.closed {
198		return errTransactionDone
199	}
200	if err := tr.flush(); err != nil {
201		// Return error, lets user decide either to retry or discard
202		// transaction.
203		return err
204	}
205	if len(tr.tables) != 0 {
206		// Committing transaction.
207		tr.rec.setSeqNum(tr.seq)
208		tr.db.compCommitLk.Lock()
209		tr.stats.startTimer()
210		var cerr error
211		for retry := 0; retry < 3; retry++ {
212			cerr = tr.db.s.commit(&tr.rec)
213			if cerr != nil {
214				tr.db.logf("transaction@commit error R·%d %q", retry, cerr)
215				select {
216				case <-time.After(time.Second):
217				case <-tr.db.closeC:
218					tr.db.logf("transaction@commit exiting")
219					tr.db.compCommitLk.Unlock()
220					return cerr
221				}
222			} else {
223				// Success. Set db.seq.
224				tr.db.setSeq(tr.seq)
225				break
226			}
227		}
228		tr.stats.stopTimer()
229		if cerr != nil {
230			// Return error, lets user decide either to retry or discard
231			// transaction.
232			return cerr
233		}
234
235		// Update compaction stats. This is safe as long as we hold compCommitLk.
236		tr.db.compStats.addStat(0, &tr.stats)
237
238		// Trigger table auto-compaction.
239		tr.db.compTrigger(tr.db.tcompCmdC)
240		tr.db.compCommitLk.Unlock()
241
242		// Additionally, wait compaction when certain threshold reached.
243		// Ignore error, returns error only if transaction can't be committed.
244		tr.db.waitCompaction()
245	}
246	// Only mark as done if transaction committed successfully.
247	tr.setDone()
248	return nil
249}
250
251func (tr *Transaction) discard() {
252	// Discard transaction.
253	for _, t := range tr.tables {
254		tr.db.logf("transaction@discard @%d", t.fd.Num)
255		if err1 := tr.db.s.stor.Remove(t.fd); err1 == nil {
256			tr.db.s.reuseFileNum(t.fd.Num)
257		}
258	}
259}
260
261// Discard discards the transaction.
262//
263// Other methods should not be called after transaction has been discarded.
264func (tr *Transaction) Discard() {
265	tr.lk.Lock()
266	if !tr.closed {
267		tr.discard()
268		tr.setDone()
269	}
270	tr.lk.Unlock()
271}
272
273func (db *DB) waitCompaction() error {
274	if db.s.tLen(0) >= db.s.o.GetWriteL0PauseTrigger() {
275		return db.compTriggerWait(db.tcompCmdC)
276	}
277	return nil
278}
279
280// OpenTransaction opens an atomic DB transaction. Only one transaction can be
281// opened at a time. Subsequent call to Write and OpenTransaction will be blocked
282// until in-flight transaction is committed or discarded.
283// The returned transaction handle is safe for concurrent use.
284//
285// Transaction is expensive and can overwhelm compaction, especially if
286// transaction size is small. Use with caution.
287//
288// The transaction must be closed once done, either by committing or discarding
289// the transaction.
290// Closing the DB will discard open transaction.
291func (db *DB) OpenTransaction() (*Transaction, error) {
292	if err := db.ok(); err != nil {
293		return nil, err
294	}
295
296	// The write happen synchronously.
297	select {
298	case db.writeLockC <- struct{}{}:
299	case err := <-db.compPerErrC:
300		return nil, err
301	case <-db.closeC:
302		return nil, ErrClosed
303	}
304
305	if db.tr != nil {
306		panic("leveldb: has open transaction")
307	}
308
309	// Flush current memdb.
310	if db.mem != nil && db.mem.Len() != 0 {
311		if _, err := db.rotateMem(0, true); err != nil {
312			return nil, err
313		}
314	}
315
316	// Wait compaction when certain threshold reached.
317	if err := db.waitCompaction(); err != nil {
318		return nil, err
319	}
320
321	tr := &Transaction{
322		db:  db,
323		seq: db.seq,
324		mem: db.mpoolGet(0),
325	}
326	tr.mem.incref()
327	db.tr = tr
328	return tr, nil
329}
330