1// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com> 2// All rights reserved. 3// 4// Use of this source code is governed by a BSD-style license that can be 5// found in the LICENSE file. 6 7package leveldb 8 9import ( 10 "bytes" 11 "fmt" 12 "sort" 13 "sync/atomic" 14 15 "github.com/syndtr/goleveldb/leveldb/cache" 16 "github.com/syndtr/goleveldb/leveldb/iterator" 17 "github.com/syndtr/goleveldb/leveldb/opt" 18 "github.com/syndtr/goleveldb/leveldb/storage" 19 "github.com/syndtr/goleveldb/leveldb/table" 20 "github.com/syndtr/goleveldb/leveldb/util" 21) 22 23// tFile holds basic information about a table. 24type tFile struct { 25 fd storage.FileDesc 26 seekLeft int32 27 size int64 28 imin, imax internalKey 29} 30 31// Returns true if given key is after largest key of this table. 32func (t *tFile) after(icmp *iComparer, ukey []byte) bool { 33 return ukey != nil && icmp.uCompare(ukey, t.imax.ukey()) > 0 34} 35 36// Returns true if given key is before smallest key of this table. 37func (t *tFile) before(icmp *iComparer, ukey []byte) bool { 38 return ukey != nil && icmp.uCompare(ukey, t.imin.ukey()) < 0 39} 40 41// Returns true if given key range overlaps with this table key range. 42func (t *tFile) overlaps(icmp *iComparer, umin, umax []byte) bool { 43 return !t.after(icmp, umin) && !t.before(icmp, umax) 44} 45 46// Cosumes one seek and return current seeks left. 47func (t *tFile) consumeSeek() int32 { 48 return atomic.AddInt32(&t.seekLeft, -1) 49} 50 51// Creates new tFile. 52func newTableFile(fd storage.FileDesc, size int64, imin, imax internalKey) *tFile { 53 f := &tFile{ 54 fd: fd, 55 size: size, 56 imin: imin, 57 imax: imax, 58 } 59 60 // We arrange to automatically compact this file after 61 // a certain number of seeks. Let's assume: 62 // (1) One seek costs 10ms 63 // (2) Writing or reading 1MB costs 10ms (100MB/s) 64 // (3) A compaction of 1MB does 25MB of IO: 65 // 1MB read from this level 66 // 10-12MB read from next level (boundaries may be misaligned) 67 // 10-12MB written to next level 68 // This implies that 25 seeks cost the same as the compaction 69 // of 1MB of data. I.e., one seek costs approximately the 70 // same as the compaction of 40KB of data. We are a little 71 // conservative and allow approximately one seek for every 16KB 72 // of data before triggering a compaction. 73 f.seekLeft = int32(size / 16384) 74 if f.seekLeft < 100 { 75 f.seekLeft = 100 76 } 77 78 return f 79} 80 81func tableFileFromRecord(r atRecord) *tFile { 82 return newTableFile(storage.FileDesc{Type: storage.TypeTable, Num: r.num}, r.size, r.imin, r.imax) 83} 84 85// tFiles hold multiple tFile. 86type tFiles []*tFile 87 88func (tf tFiles) Len() int { return len(tf) } 89func (tf tFiles) Swap(i, j int) { tf[i], tf[j] = tf[j], tf[i] } 90 91func (tf tFiles) nums() string { 92 x := "[ " 93 for i, f := range tf { 94 if i != 0 { 95 x += ", " 96 } 97 x += fmt.Sprint(f.fd.Num) 98 } 99 x += " ]" 100 return x 101} 102 103// Returns true if i smallest key is less than j. 104// This used for sort by key in ascending order. 105func (tf tFiles) lessByKey(icmp *iComparer, i, j int) bool { 106 a, b := tf[i], tf[j] 107 n := icmp.Compare(a.imin, b.imin) 108 if n == 0 { 109 return a.fd.Num < b.fd.Num 110 } 111 return n < 0 112} 113 114// Returns true if i file number is greater than j. 115// This used for sort by file number in descending order. 116func (tf tFiles) lessByNum(i, j int) bool { 117 return tf[i].fd.Num > tf[j].fd.Num 118} 119 120// Sorts tables by key in ascending order. 121func (tf tFiles) sortByKey(icmp *iComparer) { 122 sort.Sort(&tFilesSortByKey{tFiles: tf, icmp: icmp}) 123} 124 125// Sorts tables by file number in descending order. 126func (tf tFiles) sortByNum() { 127 sort.Sort(&tFilesSortByNum{tFiles: tf}) 128} 129 130// Returns sum of all tables size. 131func (tf tFiles) size() (sum int64) { 132 for _, t := range tf { 133 sum += t.size 134 } 135 return sum 136} 137 138// Searches smallest index of tables whose its smallest 139// key is after or equal with given key. 140func (tf tFiles) searchMin(icmp *iComparer, ikey internalKey) int { 141 return sort.Search(len(tf), func(i int) bool { 142 return icmp.Compare(tf[i].imin, ikey) >= 0 143 }) 144} 145 146// Searches smallest index of tables whose its largest 147// key is after or equal with given key. 148func (tf tFiles) searchMax(icmp *iComparer, ikey internalKey) int { 149 return sort.Search(len(tf), func(i int) bool { 150 return icmp.Compare(tf[i].imax, ikey) >= 0 151 }) 152} 153 154// Searches smallest index of tables whose its file number 155// is smaller than the given number. 156func (tf tFiles) searchNumLess(num int64) int { 157 return sort.Search(len(tf), func(i int) bool { 158 return tf[i].fd.Num < num 159 }) 160} 161 162// Searches smallest index of tables whose its smallest 163// key is after the given key. 164func (tf tFiles) searchMinUkey(icmp *iComparer, umin []byte) int { 165 return sort.Search(len(tf), func(i int) bool { 166 return icmp.ucmp.Compare(tf[i].imin.ukey(), umin) > 0 167 }) 168} 169 170// Searches smallest index of tables whose its largest 171// key is after the given key. 172func (tf tFiles) searchMaxUkey(icmp *iComparer, umax []byte) int { 173 return sort.Search(len(tf), func(i int) bool { 174 return icmp.ucmp.Compare(tf[i].imax.ukey(), umax) > 0 175 }) 176} 177 178// Returns true if given key range overlaps with one or more 179// tables key range. If unsorted is true then binary search will not be used. 180func (tf tFiles) overlaps(icmp *iComparer, umin, umax []byte, unsorted bool) bool { 181 if unsorted { 182 // Check against all files. 183 for _, t := range tf { 184 if t.overlaps(icmp, umin, umax) { 185 return true 186 } 187 } 188 return false 189 } 190 191 i := 0 192 if len(umin) > 0 { 193 // Find the earliest possible internal key for min. 194 i = tf.searchMax(icmp, makeInternalKey(nil, umin, keyMaxSeq, keyTypeSeek)) 195 } 196 if i >= len(tf) { 197 // Beginning of range is after all files, so no overlap. 198 return false 199 } 200 return !tf[i].before(icmp, umax) 201} 202 203// Returns tables whose its key range overlaps with given key range. 204// Range will be expanded if ukey found hop across tables. 205// If overlapped is true then the search will be restarted if umax 206// expanded. 207// The dst content will be overwritten. 208func (tf tFiles) getOverlaps(dst tFiles, icmp *iComparer, umin, umax []byte, overlapped bool) tFiles { 209 // Short circuit if tf is empty 210 if len(tf) == 0 { 211 return nil 212 } 213 // For non-zero levels, there is no ukey hop across at all. 214 // And what's more, the files in these levels are strictly sorted, 215 // so use binary search instead of heavy traverse. 216 if !overlapped { 217 var begin, end int 218 // Determine the begin index of the overlapped file 219 if umin != nil { 220 index := tf.searchMinUkey(icmp, umin) 221 if index == 0 { 222 begin = 0 223 } else if bytes.Compare(tf[index-1].imax.ukey(), umin) >= 0 { 224 // The min ukey overlaps with the index-1 file, expand it. 225 begin = index - 1 226 } else { 227 begin = index 228 } 229 } 230 // Determine the end index of the overlapped file 231 if umax != nil { 232 index := tf.searchMaxUkey(icmp, umax) 233 if index == len(tf) { 234 end = len(tf) 235 } else if bytes.Compare(tf[index].imin.ukey(), umax) <= 0 { 236 // The max ukey overlaps with the index file, expand it. 237 end = index + 1 238 } else { 239 end = index 240 } 241 } else { 242 end = len(tf) 243 } 244 // Ensure the overlapped file indexes are valid. 245 if begin >= end { 246 return nil 247 } 248 dst = make([]*tFile, end-begin) 249 copy(dst, tf[begin:end]) 250 return dst 251 } 252 253 dst = dst[:0] 254 for i := 0; i < len(tf); { 255 t := tf[i] 256 if t.overlaps(icmp, umin, umax) { 257 if umin != nil && icmp.uCompare(t.imin.ukey(), umin) < 0 { 258 umin = t.imin.ukey() 259 dst = dst[:0] 260 i = 0 261 continue 262 } else if umax != nil && icmp.uCompare(t.imax.ukey(), umax) > 0 { 263 umax = t.imax.ukey() 264 // Restart search if it is overlapped. 265 dst = dst[:0] 266 i = 0 267 continue 268 } 269 270 dst = append(dst, t) 271 } 272 i++ 273 } 274 275 return dst 276} 277 278// Returns tables key range. 279func (tf tFiles) getRange(icmp *iComparer) (imin, imax internalKey) { 280 for i, t := range tf { 281 if i == 0 { 282 imin, imax = t.imin, t.imax 283 continue 284 } 285 if icmp.Compare(t.imin, imin) < 0 { 286 imin = t.imin 287 } 288 if icmp.Compare(t.imax, imax) > 0 { 289 imax = t.imax 290 } 291 } 292 293 return 294} 295 296// Creates iterator index from tables. 297func (tf tFiles) newIndexIterator(tops *tOps, icmp *iComparer, slice *util.Range, ro *opt.ReadOptions) iterator.IteratorIndexer { 298 if slice != nil { 299 var start, limit int 300 if slice.Start != nil { 301 start = tf.searchMax(icmp, internalKey(slice.Start)) 302 } 303 if slice.Limit != nil { 304 limit = tf.searchMin(icmp, internalKey(slice.Limit)) 305 } else { 306 limit = tf.Len() 307 } 308 tf = tf[start:limit] 309 } 310 return iterator.NewArrayIndexer(&tFilesArrayIndexer{ 311 tFiles: tf, 312 tops: tops, 313 icmp: icmp, 314 slice: slice, 315 ro: ro, 316 }) 317} 318 319// Tables iterator index. 320type tFilesArrayIndexer struct { 321 tFiles 322 tops *tOps 323 icmp *iComparer 324 slice *util.Range 325 ro *opt.ReadOptions 326} 327 328func (a *tFilesArrayIndexer) Search(key []byte) int { 329 return a.searchMax(a.icmp, internalKey(key)) 330} 331 332func (a *tFilesArrayIndexer) Get(i int) iterator.Iterator { 333 if i == 0 || i == a.Len()-1 { 334 return a.tops.newIterator(a.tFiles[i], a.slice, a.ro) 335 } 336 return a.tops.newIterator(a.tFiles[i], nil, a.ro) 337} 338 339// Helper type for sortByKey. 340type tFilesSortByKey struct { 341 tFiles 342 icmp *iComparer 343} 344 345func (x *tFilesSortByKey) Less(i, j int) bool { 346 return x.lessByKey(x.icmp, i, j) 347} 348 349// Helper type for sortByNum. 350type tFilesSortByNum struct { 351 tFiles 352} 353 354func (x *tFilesSortByNum) Less(i, j int) bool { 355 return x.lessByNum(i, j) 356} 357 358// Table operations. 359type tOps struct { 360 s *session 361 noSync bool 362 evictRemoved bool 363 cache *cache.Cache 364 bcache *cache.Cache 365 bpool *util.BufferPool 366} 367 368// Creates an empty table and returns table writer. 369func (t *tOps) create() (*tWriter, error) { 370 fd := storage.FileDesc{Type: storage.TypeTable, Num: t.s.allocFileNum()} 371 fw, err := t.s.stor.Create(fd) 372 if err != nil { 373 return nil, err 374 } 375 return &tWriter{ 376 t: t, 377 fd: fd, 378 w: fw, 379 tw: table.NewWriter(fw, t.s.o.Options), 380 }, nil 381} 382 383// Builds table from src iterator. 384func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) { 385 w, err := t.create() 386 if err != nil { 387 return 388 } 389 390 defer func() { 391 if err != nil { 392 w.drop() 393 } 394 }() 395 396 for src.Next() { 397 err = w.append(src.Key(), src.Value()) 398 if err != nil { 399 return 400 } 401 } 402 err = src.Error() 403 if err != nil { 404 return 405 } 406 407 n = w.tw.EntriesLen() 408 f, err = w.finish() 409 return 410} 411 412// Opens table. It returns a cache handle, which should 413// be released after use. 414func (t *tOps) open(f *tFile) (ch *cache.Handle, err error) { 415 ch = t.cache.Get(0, uint64(f.fd.Num), func() (size int, value cache.Value) { 416 var r storage.Reader 417 r, err = t.s.stor.Open(f.fd) 418 if err != nil { 419 return 0, nil 420 } 421 422 var bcache *cache.NamespaceGetter 423 if t.bcache != nil { 424 bcache = &cache.NamespaceGetter{Cache: t.bcache, NS: uint64(f.fd.Num)} 425 } 426 427 var tr *table.Reader 428 tr, err = table.NewReader(r, f.size, f.fd, bcache, t.bpool, t.s.o.Options) 429 if err != nil { 430 r.Close() 431 return 0, nil 432 } 433 return 1, tr 434 435 }) 436 if ch == nil && err == nil { 437 err = ErrClosed 438 } 439 return 440} 441 442// Finds key/value pair whose key is greater than or equal to the 443// given key. 444func (t *tOps) find(f *tFile, key []byte, ro *opt.ReadOptions) (rkey, rvalue []byte, err error) { 445 ch, err := t.open(f) 446 if err != nil { 447 return nil, nil, err 448 } 449 defer ch.Release() 450 return ch.Value().(*table.Reader).Find(key, true, ro) 451} 452 453// Finds key that is greater than or equal to the given key. 454func (t *tOps) findKey(f *tFile, key []byte, ro *opt.ReadOptions) (rkey []byte, err error) { 455 ch, err := t.open(f) 456 if err != nil { 457 return nil, err 458 } 459 defer ch.Release() 460 return ch.Value().(*table.Reader).FindKey(key, true, ro) 461} 462 463// Returns approximate offset of the given key. 464func (t *tOps) offsetOf(f *tFile, key []byte) (offset int64, err error) { 465 ch, err := t.open(f) 466 if err != nil { 467 return 468 } 469 defer ch.Release() 470 return ch.Value().(*table.Reader).OffsetOf(key) 471} 472 473// Creates an iterator from the given table. 474func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { 475 ch, err := t.open(f) 476 if err != nil { 477 return iterator.NewEmptyIterator(err) 478 } 479 iter := ch.Value().(*table.Reader).NewIterator(slice, ro) 480 iter.SetReleaser(ch) 481 return iter 482} 483 484// Removes table from persistent storage. It waits until 485// no one use the the table. 486func (t *tOps) remove(fd storage.FileDesc) { 487 t.cache.Delete(0, uint64(fd.Num), func() { 488 if err := t.s.stor.Remove(fd); err != nil { 489 t.s.logf("table@remove removing @%d %q", fd.Num, err) 490 } else { 491 t.s.logf("table@remove removed @%d", fd.Num) 492 } 493 if t.evictRemoved && t.bcache != nil { 494 t.bcache.EvictNS(uint64(fd.Num)) 495 } 496 // Try to reuse file num, useful for discarded transaction. 497 t.s.reuseFileNum(fd.Num) 498 }) 499} 500 501// Closes the table ops instance. It will close all tables, 502// regadless still used or not. 503func (t *tOps) close() { 504 t.bpool.Close() 505 t.cache.Close() 506 if t.bcache != nil { 507 t.bcache.CloseWeak() 508 } 509} 510 511// Creates new initialized table ops instance. 512func newTableOps(s *session) *tOps { 513 var ( 514 cacher cache.Cacher 515 bcache *cache.Cache 516 bpool *util.BufferPool 517 ) 518 if s.o.GetOpenFilesCacheCapacity() > 0 { 519 cacher = cache.NewLRU(s.o.GetOpenFilesCacheCapacity()) 520 } 521 if !s.o.GetDisableBlockCache() { 522 var bcacher cache.Cacher 523 if s.o.GetBlockCacheCapacity() > 0 { 524 bcacher = s.o.GetBlockCacher().New(s.o.GetBlockCacheCapacity()) 525 } 526 bcache = cache.NewCache(bcacher) 527 } 528 if !s.o.GetDisableBufferPool() { 529 bpool = util.NewBufferPool(s.o.GetBlockSize() + 5) 530 } 531 return &tOps{ 532 s: s, 533 noSync: s.o.GetNoSync(), 534 evictRemoved: s.o.GetBlockCacheEvictRemoved(), 535 cache: cache.NewCache(cacher), 536 bcache: bcache, 537 bpool: bpool, 538 } 539} 540 541// tWriter wraps the table writer. It keep track of file descriptor 542// and added key range. 543type tWriter struct { 544 t *tOps 545 546 fd storage.FileDesc 547 w storage.Writer 548 tw *table.Writer 549 550 first, last []byte 551} 552 553// Append key/value pair to the table. 554func (w *tWriter) append(key, value []byte) error { 555 if w.first == nil { 556 w.first = append([]byte{}, key...) 557 } 558 w.last = append(w.last[:0], key...) 559 return w.tw.Append(key, value) 560} 561 562// Returns true if the table is empty. 563func (w *tWriter) empty() bool { 564 return w.first == nil 565} 566 567// Closes the storage.Writer. 568func (w *tWriter) close() { 569 if w.w != nil { 570 w.w.Close() 571 w.w = nil 572 } 573} 574 575// Finalizes the table and returns table file. 576func (w *tWriter) finish() (f *tFile, err error) { 577 defer w.close() 578 err = w.tw.Close() 579 if err != nil { 580 return 581 } 582 if !w.t.noSync { 583 err = w.w.Sync() 584 if err != nil { 585 return 586 } 587 } 588 f = newTableFile(w.fd, int64(w.tw.BytesLen()), internalKey(w.first), internalKey(w.last)) 589 return 590} 591 592// Drops the table. 593func (w *tWriter) drop() { 594 w.close() 595 w.t.s.stor.Remove(w.fd) 596 w.t.s.reuseFileNum(w.fd.Num) 597 w.tw = nil 598 w.first = nil 599 w.last = nil 600} 601