1// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com> 2// All rights reserved. 3// 4// Use of this source code is governed by a BSD-style license that can be 5// found in the LICENSE file. 6 7package leveldb 8 9import ( 10 "sync/atomic" 11 "time" 12 13 "github.com/syndtr/goleveldb/leveldb/memdb" 14 "github.com/syndtr/goleveldb/leveldb/opt" 15 "github.com/syndtr/goleveldb/leveldb/util" 16) 17 18func (db *DB) writeJournal(batches []*Batch, seq uint64, sync bool) error { 19 wr, err := db.journal.Next() 20 if err != nil { 21 return err 22 } 23 if err := writeBatchesWithHeader(wr, batches, seq); err != nil { 24 return err 25 } 26 if err := db.journal.Flush(); err != nil { 27 return err 28 } 29 if sync { 30 return db.journalWriter.Sync() 31 } 32 return nil 33} 34 35func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) { 36 retryLimit := 3 37retry: 38 // Wait for pending memdb compaction. 39 err = db.compTriggerWait(db.mcompCmdC) 40 if err != nil { 41 return 42 } 43 retryLimit-- 44 45 // Create new memdb and journal. 46 mem, err = db.newMem(n) 47 if err != nil { 48 if err == errHasFrozenMem { 49 if retryLimit <= 0 { 50 panic("BUG: still has frozen memdb") 51 } 52 goto retry 53 } 54 return 55 } 56 57 // Schedule memdb compaction. 58 if wait { 59 err = db.compTriggerWait(db.mcompCmdC) 60 } else { 61 db.compTrigger(db.mcompCmdC) 62 } 63 return 64} 65 66func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) { 67 delayed := false 68 slowdownTrigger := db.s.o.GetWriteL0SlowdownTrigger() 69 pauseTrigger := db.s.o.GetWriteL0PauseTrigger() 70 flush := func() (retry bool) { 71 mdb = db.getEffectiveMem() 72 if mdb == nil { 73 err = ErrClosed 74 return false 75 } 76 defer func() { 77 if retry { 78 mdb.decref() 79 mdb = nil 80 } 81 }() 82 tLen := db.s.tLen(0) 83 mdbFree = mdb.Free() 84 switch { 85 case tLen >= slowdownTrigger && !delayed: 86 delayed = true 87 time.Sleep(time.Millisecond) 88 case mdbFree >= n: 89 return false 90 case tLen >= pauseTrigger: 91 delayed = true 92 // Set the write paused flag explicitly. 93 atomic.StoreInt32(&db.inWritePaused, 1) 94 err = db.compTriggerWait(db.tcompCmdC) 95 // Unset the write paused flag. 96 atomic.StoreInt32(&db.inWritePaused, 0) 97 if err != nil { 98 return false 99 } 100 default: 101 // Allow memdb to grow if it has no entry. 102 if mdb.Len() == 0 { 103 mdbFree = n 104 } else { 105 mdb.decref() 106 mdb, err = db.rotateMem(n, false) 107 if err == nil { 108 mdbFree = mdb.Free() 109 } else { 110 mdbFree = 0 111 } 112 } 113 return false 114 } 115 return true 116 } 117 start := time.Now() 118 for flush() { 119 } 120 if delayed { 121 db.writeDelay += time.Since(start) 122 db.writeDelayN++ 123 } else if db.writeDelayN > 0 { 124 db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay) 125 atomic.AddInt32(&db.cWriteDelayN, int32(db.writeDelayN)) 126 atomic.AddInt64(&db.cWriteDelay, int64(db.writeDelay)) 127 db.writeDelay = 0 128 db.writeDelayN = 0 129 } 130 return 131} 132 133type writeMerge struct { 134 sync bool 135 batch *Batch 136 keyType keyType 137 key, value []byte 138} 139 140func (db *DB) unlockWrite(overflow bool, merged int, err error) { 141 for i := 0; i < merged; i++ { 142 db.writeAckC <- err 143 } 144 if overflow { 145 // Pass lock to the next write (that failed to merge). 146 db.writeMergedC <- false 147 } else { 148 // Release lock. 149 <-db.writeLockC 150 } 151} 152 153// ourBatch is batch that we can modify. 154func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error { 155 // Try to flush memdb. This method would also trying to throttle writes 156 // if it is too fast and compaction cannot catch-up. 157 mdb, mdbFree, err := db.flush(batch.internalLen) 158 if err != nil { 159 db.unlockWrite(false, 0, err) 160 return err 161 } 162 defer mdb.decref() 163 164 var ( 165 overflow bool 166 merged int 167 batches = []*Batch{batch} 168 ) 169 170 if merge { 171 // Merge limit. 172 var mergeLimit int 173 if batch.internalLen > 128<<10 { 174 mergeLimit = (1 << 20) - batch.internalLen 175 } else { 176 mergeLimit = 128 << 10 177 } 178 mergeCap := mdbFree - batch.internalLen 179 if mergeLimit > mergeCap { 180 mergeLimit = mergeCap 181 } 182 183 merge: 184 for mergeLimit > 0 { 185 select { 186 case incoming := <-db.writeMergeC: 187 if incoming.batch != nil { 188 // Merge batch. 189 if incoming.batch.internalLen > mergeLimit { 190 overflow = true 191 break merge 192 } 193 batches = append(batches, incoming.batch) 194 mergeLimit -= incoming.batch.internalLen 195 } else { 196 // Merge put. 197 internalLen := len(incoming.key) + len(incoming.value) + 8 198 if internalLen > mergeLimit { 199 overflow = true 200 break merge 201 } 202 if ourBatch == nil { 203 ourBatch = db.batchPool.Get().(*Batch) 204 ourBatch.Reset() 205 batches = append(batches, ourBatch) 206 } 207 // We can use same batch since concurrent write doesn't 208 // guarantee write order. 209 ourBatch.appendRec(incoming.keyType, incoming.key, incoming.value) 210 mergeLimit -= internalLen 211 } 212 sync = sync || incoming.sync 213 merged++ 214 db.writeMergedC <- true 215 216 default: 217 break merge 218 } 219 } 220 } 221 222 // Release ourBatch if any. 223 if ourBatch != nil { 224 defer db.batchPool.Put(ourBatch) 225 } 226 227 // Seq number. 228 seq := db.seq + 1 229 230 // Write journal. 231 if err := db.writeJournal(batches, seq, sync); err != nil { 232 db.unlockWrite(overflow, merged, err) 233 return err 234 } 235 236 // Put batches. 237 for _, batch := range batches { 238 if err := batch.putMem(seq, mdb.DB); err != nil { 239 panic(err) 240 } 241 seq += uint64(batch.Len()) 242 } 243 244 // Incr seq number. 245 db.addSeq(uint64(batchesLen(batches))) 246 247 // Rotate memdb if it's reach the threshold. 248 if batch.internalLen >= mdbFree { 249 db.rotateMem(0, false) 250 } 251 252 db.unlockWrite(overflow, merged, nil) 253 return nil 254} 255 256// Write apply the given batch to the DB. The batch records will be applied 257// sequentially. Write might be used concurrently, when used concurrently and 258// batch is small enough, write will try to merge the batches. Set NoWriteMerge 259// option to true to disable write merge. 260// 261// It is safe to modify the contents of the arguments after Write returns but 262// not before. Write will not modify content of the batch. 263func (db *DB) Write(batch *Batch, wo *opt.WriteOptions) error { 264 if err := db.ok(); err != nil || batch == nil || batch.Len() == 0 { 265 return err 266 } 267 268 // If the batch size is larger than write buffer, it may justified to write 269 // using transaction instead. Using transaction the batch will be written 270 // into tables directly, skipping the journaling. 271 if batch.internalLen > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() { 272 tr, err := db.OpenTransaction() 273 if err != nil { 274 return err 275 } 276 if err := tr.Write(batch, wo); err != nil { 277 tr.Discard() 278 return err 279 } 280 return tr.Commit() 281 } 282 283 merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge() 284 sync := wo.GetSync() && !db.s.o.GetNoSync() 285 286 // Acquire write lock. 287 if merge { 288 select { 289 case db.writeMergeC <- writeMerge{sync: sync, batch: batch}: 290 if <-db.writeMergedC { 291 // Write is merged. 292 return <-db.writeAckC 293 } 294 // Write is not merged, the write lock is handed to us. Continue. 295 case db.writeLockC <- struct{}{}: 296 // Write lock acquired. 297 case err := <-db.compPerErrC: 298 // Compaction error. 299 return err 300 case <-db.closeC: 301 // Closed 302 return ErrClosed 303 } 304 } else { 305 select { 306 case db.writeLockC <- struct{}{}: 307 // Write lock acquired. 308 case err := <-db.compPerErrC: 309 // Compaction error. 310 return err 311 case <-db.closeC: 312 // Closed 313 return ErrClosed 314 } 315 } 316 317 return db.writeLocked(batch, nil, merge, sync) 318} 319 320func (db *DB) putRec(kt keyType, key, value []byte, wo *opt.WriteOptions) error { 321 if err := db.ok(); err != nil { 322 return err 323 } 324 325 merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge() 326 sync := wo.GetSync() && !db.s.o.GetNoSync() 327 328 // Acquire write lock. 329 if merge { 330 select { 331 case db.writeMergeC <- writeMerge{sync: sync, keyType: kt, key: key, value: value}: 332 if <-db.writeMergedC { 333 // Write is merged. 334 return <-db.writeAckC 335 } 336 // Write is not merged, the write lock is handed to us. Continue. 337 case db.writeLockC <- struct{}{}: 338 // Write lock acquired. 339 case err := <-db.compPerErrC: 340 // Compaction error. 341 return err 342 case <-db.closeC: 343 // Closed 344 return ErrClosed 345 } 346 } else { 347 select { 348 case db.writeLockC <- struct{}{}: 349 // Write lock acquired. 350 case err := <-db.compPerErrC: 351 // Compaction error. 352 return err 353 case <-db.closeC: 354 // Closed 355 return ErrClosed 356 } 357 } 358 359 batch := db.batchPool.Get().(*Batch) 360 batch.Reset() 361 batch.appendRec(kt, key, value) 362 return db.writeLocked(batch, batch, merge, sync) 363} 364 365// Put sets the value for the given key. It overwrites any previous value 366// for that key; a DB is not a multi-map. Write merge also applies for Put, see 367// Write. 368// 369// It is safe to modify the contents of the arguments after Put returns but not 370// before. 371func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error { 372 return db.putRec(keyTypeVal, key, value, wo) 373} 374 375// Delete deletes the value for the given key. Delete will not returns error if 376// key doesn't exist. Write merge also applies for Delete, see Write. 377// 378// It is safe to modify the contents of the arguments after Delete returns but 379// not before. 380func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error { 381 return db.putRec(keyTypeDel, key, nil, wo) 382} 383 384func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool { 385 iter := mem.NewIterator(nil) 386 defer iter.Release() 387 return (max == nil || (iter.First() && icmp.uCompare(max, internalKey(iter.Key()).ukey()) >= 0)) && 388 (min == nil || (iter.Last() && icmp.uCompare(min, internalKey(iter.Key()).ukey()) <= 0)) 389} 390 391// CompactRange compacts the underlying DB for the given key range. 392// In particular, deleted and overwritten versions are discarded, 393// and the data is rearranged to reduce the cost of operations 394// needed to access the data. This operation should typically only 395// be invoked by users who understand the underlying implementation. 396// 397// A nil Range.Start is treated as a key before all keys in the DB. 398// And a nil Range.Limit is treated as a key after all keys in the DB. 399// Therefore if both is nil then it will compact entire DB. 400func (db *DB) CompactRange(r util.Range) error { 401 if err := db.ok(); err != nil { 402 return err 403 } 404 405 // Lock writer. 406 select { 407 case db.writeLockC <- struct{}{}: 408 case err := <-db.compPerErrC: 409 return err 410 case <-db.closeC: 411 return ErrClosed 412 } 413 414 // Check for overlaps in memdb. 415 mdb := db.getEffectiveMem() 416 if mdb == nil { 417 return ErrClosed 418 } 419 defer mdb.decref() 420 if isMemOverlaps(db.s.icmp, mdb.DB, r.Start, r.Limit) { 421 // Memdb compaction. 422 if _, err := db.rotateMem(0, false); err != nil { 423 <-db.writeLockC 424 return err 425 } 426 <-db.writeLockC 427 if err := db.compTriggerWait(db.mcompCmdC); err != nil { 428 return err 429 } 430 } else { 431 <-db.writeLockC 432 } 433 434 // Table compaction. 435 return db.compTriggerRange(db.tcompCmdC, -1, r.Start, r.Limit) 436} 437 438// SetReadOnly makes DB read-only. It will stay read-only until reopened. 439func (db *DB) SetReadOnly() error { 440 if err := db.ok(); err != nil { 441 return err 442 } 443 444 // Lock writer. 445 select { 446 case db.writeLockC <- struct{}{}: 447 db.compWriteLocking = true 448 case err := <-db.compPerErrC: 449 return err 450 case <-db.closeC: 451 return ErrClosed 452 } 453 454 // Set compaction read-only. 455 select { 456 case db.compErrSetC <- ErrReadOnly: 457 case perr := <-db.compPerErrC: 458 return perr 459 case <-db.closeC: 460 return ErrClosed 461 } 462 463 return nil 464} 465