1// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2// All rights reserved.
3//
4// Use of this source code is governed by a BSD-style license that can be
5// found in the LICENSE file.
6
7package leveldb
8
9import (
10	"time"
11
12	"github.com/syndtr/goleveldb/leveldb/memdb"
13	"github.com/syndtr/goleveldb/leveldb/opt"
14	"github.com/syndtr/goleveldb/leveldb/util"
15)
16
17func (db *DB) writeJournal(batches []*Batch, seq uint64, sync bool) error {
18	wr, err := db.journal.Next()
19	if err != nil {
20		return err
21	}
22	if err := writeBatchesWithHeader(wr, batches, seq); err != nil {
23		return err
24	}
25	if err := db.journal.Flush(); err != nil {
26		return err
27	}
28	if sync {
29		return db.journalWriter.Sync()
30	}
31	return nil
32}
33
34func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) {
35	// Wait for pending memdb compaction.
36	err = db.compTriggerWait(db.mcompCmdC)
37	if err != nil {
38		return
39	}
40
41	// Create new memdb and journal.
42	mem, err = db.newMem(n)
43	if err != nil {
44		return
45	}
46
47	// Schedule memdb compaction.
48	if wait {
49		err = db.compTriggerWait(db.mcompCmdC)
50	} else {
51		db.compTrigger(db.mcompCmdC)
52	}
53	return
54}
55
56func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) {
57	delayed := false
58	slowdownTrigger := db.s.o.GetWriteL0SlowdownTrigger()
59	pauseTrigger := db.s.o.GetWriteL0PauseTrigger()
60	flush := func() (retry bool) {
61		mdb = db.getEffectiveMem()
62		if mdb == nil {
63			err = ErrClosed
64			return false
65		}
66		defer func() {
67			if retry {
68				mdb.decref()
69				mdb = nil
70			}
71		}()
72		tLen := db.s.tLen(0)
73		mdbFree = mdb.Free()
74		switch {
75		case tLen >= slowdownTrigger && !delayed:
76			delayed = true
77			time.Sleep(time.Millisecond)
78		case mdbFree >= n:
79			return false
80		case tLen >= pauseTrigger:
81			delayed = true
82			err = db.compTriggerWait(db.tcompCmdC)
83			if err != nil {
84				return false
85			}
86		default:
87			// Allow memdb to grow if it has no entry.
88			if mdb.Len() == 0 {
89				mdbFree = n
90			} else {
91				mdb.decref()
92				mdb, err = db.rotateMem(n, false)
93				if err == nil {
94					mdbFree = mdb.Free()
95				} else {
96					mdbFree = 0
97				}
98			}
99			return false
100		}
101		return true
102	}
103	start := time.Now()
104	for flush() {
105	}
106	if delayed {
107		db.writeDelay += time.Since(start)
108		db.writeDelayN++
109	} else if db.writeDelayN > 0 {
110		db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay)
111		db.writeDelay = 0
112		db.writeDelayN = 0
113	}
114	return
115}
116
117type writeMerge struct {
118	sync       bool
119	batch      *Batch
120	keyType    keyType
121	key, value []byte
122}
123
124func (db *DB) unlockWrite(overflow bool, merged int, err error) {
125	for i := 0; i < merged; i++ {
126		db.writeAckC <- err
127	}
128	if overflow {
129		// Pass lock to the next write (that failed to merge).
130		db.writeMergedC <- false
131	} else {
132		// Release lock.
133		<-db.writeLockC
134	}
135}
136
137// ourBatch if defined should equal with batch.
138func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error {
139	// Try to flush memdb. This method would also trying to throttle writes
140	// if it is too fast and compaction cannot catch-up.
141	mdb, mdbFree, err := db.flush(batch.internalLen)
142	if err != nil {
143		db.unlockWrite(false, 0, err)
144		return err
145	}
146	defer mdb.decref()
147
148	var (
149		overflow bool
150		merged   int
151		batches  = []*Batch{batch}
152	)
153
154	if merge {
155		// Merge limit.
156		var mergeLimit int
157		if batch.internalLen > 128<<10 {
158			mergeLimit = (1 << 20) - batch.internalLen
159		} else {
160			mergeLimit = 128 << 10
161		}
162		mergeCap := mdbFree - batch.internalLen
163		if mergeLimit > mergeCap {
164			mergeLimit = mergeCap
165		}
166
167	merge:
168		for mergeLimit > 0 {
169			select {
170			case incoming := <-db.writeMergeC:
171				if incoming.batch != nil {
172					// Merge batch.
173					if incoming.batch.internalLen > mergeLimit {
174						overflow = true
175						break merge
176					}
177					batches = append(batches, incoming.batch)
178					mergeLimit -= incoming.batch.internalLen
179				} else {
180					// Merge put.
181					internalLen := len(incoming.key) + len(incoming.value) + 8
182					if internalLen > mergeLimit {
183						overflow = true
184						break merge
185					}
186					if ourBatch == nil {
187						ourBatch = db.batchPool.Get().(*Batch)
188						ourBatch.Reset()
189						batches = append(batches, ourBatch)
190					}
191					// We can use same batch since concurrent write doesn't
192					// guarantee write order.
193					ourBatch.appendRec(incoming.keyType, incoming.key, incoming.value)
194					mergeLimit -= internalLen
195				}
196				sync = sync || incoming.sync
197				merged++
198				db.writeMergedC <- true
199
200			default:
201				break merge
202			}
203		}
204	}
205
206	// Seq number.
207	seq := db.seq + 1
208
209	// Write journal.
210	if err := db.writeJournal(batches, seq, sync); err != nil {
211		db.unlockWrite(overflow, merged, err)
212		return err
213	}
214
215	// Put batches.
216	for _, batch := range batches {
217		if err := batch.putMem(seq, mdb.DB); err != nil {
218			panic(err)
219		}
220		seq += uint64(batch.Len())
221	}
222
223	// Incr seq number.
224	db.addSeq(uint64(batchesLen(batches)))
225
226	// Rotate memdb if it's reach the threshold.
227	if batch.internalLen >= mdbFree {
228		db.rotateMem(0, false)
229	}
230
231	db.unlockWrite(overflow, merged, nil)
232	return nil
233}
234
235// Write apply the given batch to the DB. The batch records will be applied
236// sequentially. Write might be used concurrently, when used concurrently and
237// batch is small enough, write will try to merge the batches. Set NoWriteMerge
238// option to true to disable write merge.
239//
240// It is safe to modify the contents of the arguments after Write returns but
241// not before. Write will not modify content of the batch.
242func (db *DB) Write(batch *Batch, wo *opt.WriteOptions) error {
243	if err := db.ok(); err != nil || batch == nil || batch.Len() == 0 {
244		return err
245	}
246
247	// If the batch size is larger than write buffer, it may justified to write
248	// using transaction instead. Using transaction the batch will be written
249	// into tables directly, skipping the journaling.
250	if batch.internalLen > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() {
251		tr, err := db.OpenTransaction()
252		if err != nil {
253			return err
254		}
255		if err := tr.Write(batch, wo); err != nil {
256			tr.Discard()
257			return err
258		}
259		return tr.Commit()
260	}
261
262	merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
263	sync := wo.GetSync() && !db.s.o.GetNoSync()
264
265	// Acquire write lock.
266	if merge {
267		select {
268		case db.writeMergeC <- writeMerge{sync: sync, batch: batch}:
269			if <-db.writeMergedC {
270				// Write is merged.
271				return <-db.writeAckC
272			}
273			// Write is not merged, the write lock is handed to us. Continue.
274		case db.writeLockC <- struct{}{}:
275			// Write lock acquired.
276		case err := <-db.compPerErrC:
277			// Compaction error.
278			return err
279		case <-db.closeC:
280			// Closed
281			return ErrClosed
282		}
283	} else {
284		select {
285		case db.writeLockC <- struct{}{}:
286			// Write lock acquired.
287		case err := <-db.compPerErrC:
288			// Compaction error.
289			return err
290		case <-db.closeC:
291			// Closed
292			return ErrClosed
293		}
294	}
295
296	return db.writeLocked(batch, nil, merge, sync)
297}
298
299func (db *DB) putRec(kt keyType, key, value []byte, wo *opt.WriteOptions) error {
300	if err := db.ok(); err != nil {
301		return err
302	}
303
304	merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
305	sync := wo.GetSync() && !db.s.o.GetNoSync()
306
307	// Acquire write lock.
308	if merge {
309		select {
310		case db.writeMergeC <- writeMerge{sync: sync, keyType: kt, key: key, value: value}:
311			if <-db.writeMergedC {
312				// Write is merged.
313				return <-db.writeAckC
314			}
315			// Write is not merged, the write lock is handed to us. Continue.
316		case db.writeLockC <- struct{}{}:
317			// Write lock acquired.
318		case err := <-db.compPerErrC:
319			// Compaction error.
320			return err
321		case <-db.closeC:
322			// Closed
323			return ErrClosed
324		}
325	} else {
326		select {
327		case db.writeLockC <- struct{}{}:
328			// Write lock acquired.
329		case err := <-db.compPerErrC:
330			// Compaction error.
331			return err
332		case <-db.closeC:
333			// Closed
334			return ErrClosed
335		}
336	}
337
338	batch := db.batchPool.Get().(*Batch)
339	batch.Reset()
340	batch.appendRec(kt, key, value)
341	return db.writeLocked(batch, batch, merge, sync)
342}
343
344// Put sets the value for the given key. It overwrites any previous value
345// for that key; a DB is not a multi-map. Write merge also applies for Put, see
346// Write.
347//
348// It is safe to modify the contents of the arguments after Put returns but not
349// before.
350func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error {
351	return db.putRec(keyTypeVal, key, value, wo)
352}
353
354// Delete deletes the value for the given key. Delete will not returns error if
355// key doesn't exist. Write merge also applies for Delete, see Write.
356//
357// It is safe to modify the contents of the arguments after Delete returns but
358// not before.
359func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error {
360	return db.putRec(keyTypeDel, key, nil, wo)
361}
362
363func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool {
364	iter := mem.NewIterator(nil)
365	defer iter.Release()
366	return (max == nil || (iter.First() && icmp.uCompare(max, internalKey(iter.Key()).ukey()) >= 0)) &&
367		(min == nil || (iter.Last() && icmp.uCompare(min, internalKey(iter.Key()).ukey()) <= 0))
368}
369
370// CompactRange compacts the underlying DB for the given key range.
371// In particular, deleted and overwritten versions are discarded,
372// and the data is rearranged to reduce the cost of operations
373// needed to access the data. This operation should typically only
374// be invoked by users who understand the underlying implementation.
375//
376// A nil Range.Start is treated as a key before all keys in the DB.
377// And a nil Range.Limit is treated as a key after all keys in the DB.
378// Therefore if both is nil then it will compact entire DB.
379func (db *DB) CompactRange(r util.Range) error {
380	if err := db.ok(); err != nil {
381		return err
382	}
383
384	// Lock writer.
385	select {
386	case db.writeLockC <- struct{}{}:
387	case err := <-db.compPerErrC:
388		return err
389	case <-db.closeC:
390		return ErrClosed
391	}
392
393	// Check for overlaps in memdb.
394	mdb := db.getEffectiveMem()
395	if mdb == nil {
396		return ErrClosed
397	}
398	defer mdb.decref()
399	if isMemOverlaps(db.s.icmp, mdb.DB, r.Start, r.Limit) {
400		// Memdb compaction.
401		if _, err := db.rotateMem(0, false); err != nil {
402			<-db.writeLockC
403			return err
404		}
405		<-db.writeLockC
406		if err := db.compTriggerWait(db.mcompCmdC); err != nil {
407			return err
408		}
409	} else {
410		<-db.writeLockC
411	}
412
413	// Table compaction.
414	return db.compTriggerRange(db.tcompCmdC, -1, r.Start, r.Limit)
415}
416
417// SetReadOnly makes DB read-only. It will stay read-only until reopened.
418func (db *DB) SetReadOnly() error {
419	if err := db.ok(); err != nil {
420		return err
421	}
422
423	// Lock writer.
424	select {
425	case db.writeLockC <- struct{}{}:
426		db.compWriteLocking = true
427	case err := <-db.compPerErrC:
428		return err
429	case <-db.closeC:
430		return ErrClosed
431	}
432
433	// Set compaction read-only.
434	select {
435	case db.compErrSetC <- ErrReadOnly:
436	case perr := <-db.compPerErrC:
437		return perr
438	case <-db.closeC:
439		return ErrClosed
440	}
441
442	return nil
443}
444