1// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com> 2// All rights reserved. 3// 4// Use of this source code is governed by a BSD-style license that can be 5// found in the LICENSE file. 6 7package leveldb 8 9import ( 10 "time" 11 12 "github.com/syndtr/goleveldb/leveldb/memdb" 13 "github.com/syndtr/goleveldb/leveldb/opt" 14 "github.com/syndtr/goleveldb/leveldb/util" 15) 16 17func (db *DB) writeJournal(batches []*Batch, seq uint64, sync bool) error { 18 wr, err := db.journal.Next() 19 if err != nil { 20 return err 21 } 22 if err := writeBatchesWithHeader(wr, batches, seq); err != nil { 23 return err 24 } 25 if err := db.journal.Flush(); err != nil { 26 return err 27 } 28 if sync { 29 return db.journalWriter.Sync() 30 } 31 return nil 32} 33 34func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) { 35 // Wait for pending memdb compaction. 36 err = db.compTriggerWait(db.mcompCmdC) 37 if err != nil { 38 return 39 } 40 41 // Create new memdb and journal. 42 mem, err = db.newMem(n) 43 if err != nil { 44 return 45 } 46 47 // Schedule memdb compaction. 48 if wait { 49 err = db.compTriggerWait(db.mcompCmdC) 50 } else { 51 db.compTrigger(db.mcompCmdC) 52 } 53 return 54} 55 56func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) { 57 delayed := false 58 slowdownTrigger := db.s.o.GetWriteL0SlowdownTrigger() 59 pauseTrigger := db.s.o.GetWriteL0PauseTrigger() 60 flush := func() (retry bool) { 61 mdb = db.getEffectiveMem() 62 if mdb == nil { 63 err = ErrClosed 64 return false 65 } 66 defer func() { 67 if retry { 68 mdb.decref() 69 mdb = nil 70 } 71 }() 72 tLen := db.s.tLen(0) 73 mdbFree = mdb.Free() 74 switch { 75 case tLen >= slowdownTrigger && !delayed: 76 delayed = true 77 time.Sleep(time.Millisecond) 78 case mdbFree >= n: 79 return false 80 case tLen >= pauseTrigger: 81 delayed = true 82 err = db.compTriggerWait(db.tcompCmdC) 83 if err != nil { 84 return false 85 } 86 default: 87 // Allow memdb to grow if it has no entry. 88 if mdb.Len() == 0 { 89 mdbFree = n 90 } else { 91 mdb.decref() 92 mdb, err = db.rotateMem(n, false) 93 if err == nil { 94 mdbFree = mdb.Free() 95 } else { 96 mdbFree = 0 97 } 98 } 99 return false 100 } 101 return true 102 } 103 start := time.Now() 104 for flush() { 105 } 106 if delayed { 107 db.writeDelay += time.Since(start) 108 db.writeDelayN++ 109 } else if db.writeDelayN > 0 { 110 db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay) 111 db.writeDelay = 0 112 db.writeDelayN = 0 113 } 114 return 115} 116 117type writeMerge struct { 118 sync bool 119 batch *Batch 120 keyType keyType 121 key, value []byte 122} 123 124func (db *DB) unlockWrite(overflow bool, merged int, err error) { 125 for i := 0; i < merged; i++ { 126 db.writeAckC <- err 127 } 128 if overflow { 129 // Pass lock to the next write (that failed to merge). 130 db.writeMergedC <- false 131 } else { 132 // Release lock. 133 <-db.writeLockC 134 } 135} 136 137// ourBatch if defined should equal with batch. 138func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error { 139 // Try to flush memdb. This method would also trying to throttle writes 140 // if it is too fast and compaction cannot catch-up. 141 mdb, mdbFree, err := db.flush(batch.internalLen) 142 if err != nil { 143 db.unlockWrite(false, 0, err) 144 return err 145 } 146 defer mdb.decref() 147 148 var ( 149 overflow bool 150 merged int 151 batches = []*Batch{batch} 152 ) 153 154 if merge { 155 // Merge limit. 156 var mergeLimit int 157 if batch.internalLen > 128<<10 { 158 mergeLimit = (1 << 20) - batch.internalLen 159 } else { 160 mergeLimit = 128 << 10 161 } 162 mergeCap := mdbFree - batch.internalLen 163 if mergeLimit > mergeCap { 164 mergeLimit = mergeCap 165 } 166 167 merge: 168 for mergeLimit > 0 { 169 select { 170 case incoming := <-db.writeMergeC: 171 if incoming.batch != nil { 172 // Merge batch. 173 if incoming.batch.internalLen > mergeLimit { 174 overflow = true 175 break merge 176 } 177 batches = append(batches, incoming.batch) 178 mergeLimit -= incoming.batch.internalLen 179 } else { 180 // Merge put. 181 internalLen := len(incoming.key) + len(incoming.value) + 8 182 if internalLen > mergeLimit { 183 overflow = true 184 break merge 185 } 186 if ourBatch == nil { 187 ourBatch = db.batchPool.Get().(*Batch) 188 ourBatch.Reset() 189 batches = append(batches, ourBatch) 190 } 191 // We can use same batch since concurrent write doesn't 192 // guarantee write order. 193 ourBatch.appendRec(incoming.keyType, incoming.key, incoming.value) 194 mergeLimit -= internalLen 195 } 196 sync = sync || incoming.sync 197 merged++ 198 db.writeMergedC <- true 199 200 default: 201 break merge 202 } 203 } 204 } 205 206 // Seq number. 207 seq := db.seq + 1 208 209 // Write journal. 210 if err := db.writeJournal(batches, seq, sync); err != nil { 211 db.unlockWrite(overflow, merged, err) 212 return err 213 } 214 215 // Put batches. 216 for _, batch := range batches { 217 if err := batch.putMem(seq, mdb.DB); err != nil { 218 panic(err) 219 } 220 seq += uint64(batch.Len()) 221 } 222 223 // Incr seq number. 224 db.addSeq(uint64(batchesLen(batches))) 225 226 // Rotate memdb if it's reach the threshold. 227 if batch.internalLen >= mdbFree { 228 db.rotateMem(0, false) 229 } 230 231 db.unlockWrite(overflow, merged, nil) 232 return nil 233} 234 235// Write apply the given batch to the DB. The batch records will be applied 236// sequentially. Write might be used concurrently, when used concurrently and 237// batch is small enough, write will try to merge the batches. Set NoWriteMerge 238// option to true to disable write merge. 239// 240// It is safe to modify the contents of the arguments after Write returns but 241// not before. Write will not modify content of the batch. 242func (db *DB) Write(batch *Batch, wo *opt.WriteOptions) error { 243 if err := db.ok(); err != nil || batch == nil || batch.Len() == 0 { 244 return err 245 } 246 247 // If the batch size is larger than write buffer, it may justified to write 248 // using transaction instead. Using transaction the batch will be written 249 // into tables directly, skipping the journaling. 250 if batch.internalLen > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() { 251 tr, err := db.OpenTransaction() 252 if err != nil { 253 return err 254 } 255 if err := tr.Write(batch, wo); err != nil { 256 tr.Discard() 257 return err 258 } 259 return tr.Commit() 260 } 261 262 merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge() 263 sync := wo.GetSync() && !db.s.o.GetNoSync() 264 265 // Acquire write lock. 266 if merge { 267 select { 268 case db.writeMergeC <- writeMerge{sync: sync, batch: batch}: 269 if <-db.writeMergedC { 270 // Write is merged. 271 return <-db.writeAckC 272 } 273 // Write is not merged, the write lock is handed to us. Continue. 274 case db.writeLockC <- struct{}{}: 275 // Write lock acquired. 276 case err := <-db.compPerErrC: 277 // Compaction error. 278 return err 279 case <-db.closeC: 280 // Closed 281 return ErrClosed 282 } 283 } else { 284 select { 285 case db.writeLockC <- struct{}{}: 286 // Write lock acquired. 287 case err := <-db.compPerErrC: 288 // Compaction error. 289 return err 290 case <-db.closeC: 291 // Closed 292 return ErrClosed 293 } 294 } 295 296 return db.writeLocked(batch, nil, merge, sync) 297} 298 299func (db *DB) putRec(kt keyType, key, value []byte, wo *opt.WriteOptions) error { 300 if err := db.ok(); err != nil { 301 return err 302 } 303 304 merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge() 305 sync := wo.GetSync() && !db.s.o.GetNoSync() 306 307 // Acquire write lock. 308 if merge { 309 select { 310 case db.writeMergeC <- writeMerge{sync: sync, keyType: kt, key: key, value: value}: 311 if <-db.writeMergedC { 312 // Write is merged. 313 return <-db.writeAckC 314 } 315 // Write is not merged, the write lock is handed to us. Continue. 316 case db.writeLockC <- struct{}{}: 317 // Write lock acquired. 318 case err := <-db.compPerErrC: 319 // Compaction error. 320 return err 321 case <-db.closeC: 322 // Closed 323 return ErrClosed 324 } 325 } else { 326 select { 327 case db.writeLockC <- struct{}{}: 328 // Write lock acquired. 329 case err := <-db.compPerErrC: 330 // Compaction error. 331 return err 332 case <-db.closeC: 333 // Closed 334 return ErrClosed 335 } 336 } 337 338 batch := db.batchPool.Get().(*Batch) 339 batch.Reset() 340 batch.appendRec(kt, key, value) 341 return db.writeLocked(batch, batch, merge, sync) 342} 343 344// Put sets the value for the given key. It overwrites any previous value 345// for that key; a DB is not a multi-map. Write merge also applies for Put, see 346// Write. 347// 348// It is safe to modify the contents of the arguments after Put returns but not 349// before. 350func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error { 351 return db.putRec(keyTypeVal, key, value, wo) 352} 353 354// Delete deletes the value for the given key. Delete will not returns error if 355// key doesn't exist. Write merge also applies for Delete, see Write. 356// 357// It is safe to modify the contents of the arguments after Delete returns but 358// not before. 359func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error { 360 return db.putRec(keyTypeDel, key, nil, wo) 361} 362 363func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool { 364 iter := mem.NewIterator(nil) 365 defer iter.Release() 366 return (max == nil || (iter.First() && icmp.uCompare(max, internalKey(iter.Key()).ukey()) >= 0)) && 367 (min == nil || (iter.Last() && icmp.uCompare(min, internalKey(iter.Key()).ukey()) <= 0)) 368} 369 370// CompactRange compacts the underlying DB for the given key range. 371// In particular, deleted and overwritten versions are discarded, 372// and the data is rearranged to reduce the cost of operations 373// needed to access the data. This operation should typically only 374// be invoked by users who understand the underlying implementation. 375// 376// A nil Range.Start is treated as a key before all keys in the DB. 377// And a nil Range.Limit is treated as a key after all keys in the DB. 378// Therefore if both is nil then it will compact entire DB. 379func (db *DB) CompactRange(r util.Range) error { 380 if err := db.ok(); err != nil { 381 return err 382 } 383 384 // Lock writer. 385 select { 386 case db.writeLockC <- struct{}{}: 387 case err := <-db.compPerErrC: 388 return err 389 case <-db.closeC: 390 return ErrClosed 391 } 392 393 // Check for overlaps in memdb. 394 mdb := db.getEffectiveMem() 395 if mdb == nil { 396 return ErrClosed 397 } 398 defer mdb.decref() 399 if isMemOverlaps(db.s.icmp, mdb.DB, r.Start, r.Limit) { 400 // Memdb compaction. 401 if _, err := db.rotateMem(0, false); err != nil { 402 <-db.writeLockC 403 return err 404 } 405 <-db.writeLockC 406 if err := db.compTriggerWait(db.mcompCmdC); err != nil { 407 return err 408 } 409 } else { 410 <-db.writeLockC 411 } 412 413 // Table compaction. 414 return db.compTriggerRange(db.tcompCmdC, -1, r.Start, r.Limit) 415} 416 417// SetReadOnly makes DB read-only. It will stay read-only until reopened. 418func (db *DB) SetReadOnly() error { 419 if err := db.ok(); err != nil { 420 return err 421 } 422 423 // Lock writer. 424 select { 425 case db.writeLockC <- struct{}{}: 426 db.compWriteLocking = true 427 case err := <-db.compPerErrC: 428 return err 429 case <-db.closeC: 430 return ErrClosed 431 } 432 433 // Set compaction read-only. 434 select { 435 case db.compErrSetC <- ErrReadOnly: 436 case perr := <-db.compPerErrC: 437 return perr 438 case <-db.closeC: 439 return ErrClosed 440 } 441 442 return nil 443} 444