1// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
2// All rights reserved.
3//
4// Use of this source code is governed by a BSD-style license that can be
5// found in the LICENSE file.
6
7package leveldb
8
9import (
10	"sync/atomic"
11	"time"
12
13	"github.com/syndtr/goleveldb/leveldb/memdb"
14	"github.com/syndtr/goleveldb/leveldb/opt"
15	"github.com/syndtr/goleveldb/leveldb/util"
16)
17
18func (db *DB) writeJournal(batches []*Batch, seq uint64, sync bool) error {
19	wr, err := db.journal.Next()
20	if err != nil {
21		return err
22	}
23	if err := writeBatchesWithHeader(wr, batches, seq); err != nil {
24		return err
25	}
26	if err := db.journal.Flush(); err != nil {
27		return err
28	}
29	if sync {
30		return db.journalWriter.Sync()
31	}
32	return nil
33}
34
35func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) {
36	retryLimit := 3
37retry:
38	// Wait for pending memdb compaction.
39	err = db.compTriggerWait(db.mcompCmdC)
40	if err != nil {
41		return
42	}
43	retryLimit--
44
45	// Create new memdb and journal.
46	mem, err = db.newMem(n)
47	if err != nil {
48		if err == errHasFrozenMem {
49			if retryLimit <= 0 {
50				panic("BUG: still has frozen memdb")
51			}
52			goto retry
53		}
54		return
55	}
56
57	// Schedule memdb compaction.
58	if wait {
59		err = db.compTriggerWait(db.mcompCmdC)
60	} else {
61		db.compTrigger(db.mcompCmdC)
62	}
63	return
64}
65
66func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) {
67	delayed := false
68	slowdownTrigger := db.s.o.GetWriteL0SlowdownTrigger()
69	pauseTrigger := db.s.o.GetWriteL0PauseTrigger()
70	flush := func() (retry bool) {
71		mdb = db.getEffectiveMem()
72		if mdb == nil {
73			err = ErrClosed
74			return false
75		}
76		defer func() {
77			if retry {
78				mdb.decref()
79				mdb = nil
80			}
81		}()
82		tLen := db.s.tLen(0)
83		mdbFree = mdb.Free()
84		switch {
85		case tLen >= slowdownTrigger && !delayed:
86			delayed = true
87			time.Sleep(time.Millisecond)
88		case mdbFree >= n:
89			return false
90		case tLen >= pauseTrigger:
91			delayed = true
92			// Set the write paused flag explicitly.
93			atomic.StoreInt32(&db.inWritePaused, 1)
94			err = db.compTriggerWait(db.tcompCmdC)
95			// Unset the write paused flag.
96			atomic.StoreInt32(&db.inWritePaused, 0)
97			if err != nil {
98				return false
99			}
100		default:
101			// Allow memdb to grow if it has no entry.
102			if mdb.Len() == 0 {
103				mdbFree = n
104			} else {
105				mdb.decref()
106				mdb, err = db.rotateMem(n, false)
107				if err == nil {
108					mdbFree = mdb.Free()
109				} else {
110					mdbFree = 0
111				}
112			}
113			return false
114		}
115		return true
116	}
117	start := time.Now()
118	for flush() {
119	}
120	if delayed {
121		db.writeDelay += time.Since(start)
122		db.writeDelayN++
123	} else if db.writeDelayN > 0 {
124		db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay)
125		atomic.AddInt32(&db.cWriteDelayN, int32(db.writeDelayN))
126		atomic.AddInt64(&db.cWriteDelay, int64(db.writeDelay))
127		db.writeDelay = 0
128		db.writeDelayN = 0
129	}
130	return
131}
132
133type writeMerge struct {
134	sync       bool
135	batch      *Batch
136	keyType    keyType
137	key, value []byte
138}
139
140func (db *DB) unlockWrite(overflow bool, merged int, err error) {
141	for i := 0; i < merged; i++ {
142		db.writeAckC <- err
143	}
144	if overflow {
145		// Pass lock to the next write (that failed to merge).
146		db.writeMergedC <- false
147	} else {
148		// Release lock.
149		<-db.writeLockC
150	}
151}
152
153// ourBatch is batch that we can modify.
154func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error {
155	// Try to flush memdb. This method would also trying to throttle writes
156	// if it is too fast and compaction cannot catch-up.
157	mdb, mdbFree, err := db.flush(batch.internalLen)
158	if err != nil {
159		db.unlockWrite(false, 0, err)
160		return err
161	}
162	defer mdb.decref()
163
164	var (
165		overflow bool
166		merged   int
167		batches  = []*Batch{batch}
168	)
169
170	if merge {
171		// Merge limit.
172		var mergeLimit int
173		if batch.internalLen > 128<<10 {
174			mergeLimit = (1 << 20) - batch.internalLen
175		} else {
176			mergeLimit = 128 << 10
177		}
178		mergeCap := mdbFree - batch.internalLen
179		if mergeLimit > mergeCap {
180			mergeLimit = mergeCap
181		}
182
183	merge:
184		for mergeLimit > 0 {
185			select {
186			case incoming := <-db.writeMergeC:
187				if incoming.batch != nil {
188					// Merge batch.
189					if incoming.batch.internalLen > mergeLimit {
190						overflow = true
191						break merge
192					}
193					batches = append(batches, incoming.batch)
194					mergeLimit -= incoming.batch.internalLen
195				} else {
196					// Merge put.
197					internalLen := len(incoming.key) + len(incoming.value) + 8
198					if internalLen > mergeLimit {
199						overflow = true
200						break merge
201					}
202					if ourBatch == nil {
203						ourBatch = db.batchPool.Get().(*Batch)
204						ourBatch.Reset()
205						batches = append(batches, ourBatch)
206					}
207					// We can use same batch since concurrent write doesn't
208					// guarantee write order.
209					ourBatch.appendRec(incoming.keyType, incoming.key, incoming.value)
210					mergeLimit -= internalLen
211				}
212				sync = sync || incoming.sync
213				merged++
214				db.writeMergedC <- true
215
216			default:
217				break merge
218			}
219		}
220	}
221
222	// Release ourBatch if any.
223	if ourBatch != nil {
224		defer db.batchPool.Put(ourBatch)
225	}
226
227	// Seq number.
228	seq := db.seq + 1
229
230	// Write journal.
231	if err := db.writeJournal(batches, seq, sync); err != nil {
232		db.unlockWrite(overflow, merged, err)
233		return err
234	}
235
236	// Put batches.
237	for _, batch := range batches {
238		if err := batch.putMem(seq, mdb.DB); err != nil {
239			panic(err)
240		}
241		seq += uint64(batch.Len())
242	}
243
244	// Incr seq number.
245	db.addSeq(uint64(batchesLen(batches)))
246
247	// Rotate memdb if it's reach the threshold.
248	if batch.internalLen >= mdbFree {
249		db.rotateMem(0, false)
250	}
251
252	db.unlockWrite(overflow, merged, nil)
253	return nil
254}
255
256// Write apply the given batch to the DB. The batch records will be applied
257// sequentially. Write might be used concurrently, when used concurrently and
258// batch is small enough, write will try to merge the batches. Set NoWriteMerge
259// option to true to disable write merge.
260//
261// It is safe to modify the contents of the arguments after Write returns but
262// not before. Write will not modify content of the batch.
263func (db *DB) Write(batch *Batch, wo *opt.WriteOptions) error {
264	if err := db.ok(); err != nil || batch == nil || batch.Len() == 0 {
265		return err
266	}
267
268	// If the batch size is larger than write buffer, it may justified to write
269	// using transaction instead. Using transaction the batch will be written
270	// into tables directly, skipping the journaling.
271	if batch.internalLen > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() {
272		tr, err := db.OpenTransaction()
273		if err != nil {
274			return err
275		}
276		if err := tr.Write(batch, wo); err != nil {
277			tr.Discard()
278			return err
279		}
280		return tr.Commit()
281	}
282
283	merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
284	sync := wo.GetSync() && !db.s.o.GetNoSync()
285
286	// Acquire write lock.
287	if merge {
288		select {
289		case db.writeMergeC <- writeMerge{sync: sync, batch: batch}:
290			if <-db.writeMergedC {
291				// Write is merged.
292				return <-db.writeAckC
293			}
294			// Write is not merged, the write lock is handed to us. Continue.
295		case db.writeLockC <- struct{}{}:
296			// Write lock acquired.
297		case err := <-db.compPerErrC:
298			// Compaction error.
299			return err
300		case <-db.closeC:
301			// Closed
302			return ErrClosed
303		}
304	} else {
305		select {
306		case db.writeLockC <- struct{}{}:
307			// Write lock acquired.
308		case err := <-db.compPerErrC:
309			// Compaction error.
310			return err
311		case <-db.closeC:
312			// Closed
313			return ErrClosed
314		}
315	}
316
317	return db.writeLocked(batch, nil, merge, sync)
318}
319
320func (db *DB) putRec(kt keyType, key, value []byte, wo *opt.WriteOptions) error {
321	if err := db.ok(); err != nil {
322		return err
323	}
324
325	merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
326	sync := wo.GetSync() && !db.s.o.GetNoSync()
327
328	// Acquire write lock.
329	if merge {
330		select {
331		case db.writeMergeC <- writeMerge{sync: sync, keyType: kt, key: key, value: value}:
332			if <-db.writeMergedC {
333				// Write is merged.
334				return <-db.writeAckC
335			}
336			// Write is not merged, the write lock is handed to us. Continue.
337		case db.writeLockC <- struct{}{}:
338			// Write lock acquired.
339		case err := <-db.compPerErrC:
340			// Compaction error.
341			return err
342		case <-db.closeC:
343			// Closed
344			return ErrClosed
345		}
346	} else {
347		select {
348		case db.writeLockC <- struct{}{}:
349			// Write lock acquired.
350		case err := <-db.compPerErrC:
351			// Compaction error.
352			return err
353		case <-db.closeC:
354			// Closed
355			return ErrClosed
356		}
357	}
358
359	batch := db.batchPool.Get().(*Batch)
360	batch.Reset()
361	batch.appendRec(kt, key, value)
362	return db.writeLocked(batch, batch, merge, sync)
363}
364
365// Put sets the value for the given key. It overwrites any previous value
366// for that key; a DB is not a multi-map. Write merge also applies for Put, see
367// Write.
368//
369// It is safe to modify the contents of the arguments after Put returns but not
370// before.
371func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error {
372	return db.putRec(keyTypeVal, key, value, wo)
373}
374
375// Delete deletes the value for the given key. Delete will not returns error if
376// key doesn't exist. Write merge also applies for Delete, see Write.
377//
378// It is safe to modify the contents of the arguments after Delete returns but
379// not before.
380func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error {
381	return db.putRec(keyTypeDel, key, nil, wo)
382}
383
384func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool {
385	iter := mem.NewIterator(nil)
386	defer iter.Release()
387	return (max == nil || (iter.First() && icmp.uCompare(max, internalKey(iter.Key()).ukey()) >= 0)) &&
388		(min == nil || (iter.Last() && icmp.uCompare(min, internalKey(iter.Key()).ukey()) <= 0))
389}
390
391// CompactRange compacts the underlying DB for the given key range.
392// In particular, deleted and overwritten versions are discarded,
393// and the data is rearranged to reduce the cost of operations
394// needed to access the data. This operation should typically only
395// be invoked by users who understand the underlying implementation.
396//
397// A nil Range.Start is treated as a key before all keys in the DB.
398// And a nil Range.Limit is treated as a key after all keys in the DB.
399// Therefore if both is nil then it will compact entire DB.
400func (db *DB) CompactRange(r util.Range) error {
401	if err := db.ok(); err != nil {
402		return err
403	}
404
405	// Lock writer.
406	select {
407	case db.writeLockC <- struct{}{}:
408	case err := <-db.compPerErrC:
409		return err
410	case <-db.closeC:
411		return ErrClosed
412	}
413
414	// Check for overlaps in memdb.
415	mdb := db.getEffectiveMem()
416	if mdb == nil {
417		return ErrClosed
418	}
419	defer mdb.decref()
420	if isMemOverlaps(db.s.icmp, mdb.DB, r.Start, r.Limit) {
421		// Memdb compaction.
422		if _, err := db.rotateMem(0, false); err != nil {
423			<-db.writeLockC
424			return err
425		}
426		<-db.writeLockC
427		if err := db.compTriggerWait(db.mcompCmdC); err != nil {
428			return err
429		}
430	} else {
431		<-db.writeLockC
432	}
433
434	// Table compaction.
435	return db.compTriggerRange(db.tcompCmdC, -1, r.Start, r.Limit)
436}
437
438// SetReadOnly makes DB read-only. It will stay read-only until reopened.
439func (db *DB) SetReadOnly() error {
440	if err := db.ok(); err != nil {
441		return err
442	}
443
444	// Lock writer.
445	select {
446	case db.writeLockC <- struct{}{}:
447		db.compWriteLocking = true
448	case err := <-db.compPerErrC:
449		return err
450	case <-db.closeC:
451		return ErrClosed
452	}
453
454	// Set compaction read-only.
455	select {
456	case db.compErrSetC <- ErrReadOnly:
457	case perr := <-db.compPerErrC:
458		return perr
459	case <-db.closeC:
460		return ErrClosed
461	}
462
463	return nil
464}
465