1// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com> 2// All rights reserved. 3// 4// Use of this source code is governed by a BSD-style license that can be 5// found in the LICENSE file. 6 7package leveldb 8 9import ( 10 "fmt" 11 "sort" 12 "sync/atomic" 13 14 "github.com/syndtr/goleveldb/leveldb/cache" 15 "github.com/syndtr/goleveldb/leveldb/iterator" 16 "github.com/syndtr/goleveldb/leveldb/opt" 17 "github.com/syndtr/goleveldb/leveldb/storage" 18 "github.com/syndtr/goleveldb/leveldb/table" 19 "github.com/syndtr/goleveldb/leveldb/util" 20) 21 22// tFile holds basic information about a table. 23type tFile struct { 24 fd storage.FileDesc 25 seekLeft int32 26 size int64 27 imin, imax internalKey 28} 29 30// Returns true if given key is after largest key of this table. 31func (t *tFile) after(icmp *iComparer, ukey []byte) bool { 32 return ukey != nil && icmp.uCompare(ukey, t.imax.ukey()) > 0 33} 34 35// Returns true if given key is before smallest key of this table. 36func (t *tFile) before(icmp *iComparer, ukey []byte) bool { 37 return ukey != nil && icmp.uCompare(ukey, t.imin.ukey()) < 0 38} 39 40// Returns true if given key range overlaps with this table key range. 41func (t *tFile) overlaps(icmp *iComparer, umin, umax []byte) bool { 42 return !t.after(icmp, umin) && !t.before(icmp, umax) 43} 44 45// Cosumes one seek and return current seeks left. 46func (t *tFile) consumeSeek() int32 { 47 return atomic.AddInt32(&t.seekLeft, -1) 48} 49 50// Creates new tFile. 51func newTableFile(fd storage.FileDesc, size int64, imin, imax internalKey) *tFile { 52 f := &tFile{ 53 fd: fd, 54 size: size, 55 imin: imin, 56 imax: imax, 57 } 58 59 // We arrange to automatically compact this file after 60 // a certain number of seeks. Let's assume: 61 // (1) One seek costs 10ms 62 // (2) Writing or reading 1MB costs 10ms (100MB/s) 63 // (3) A compaction of 1MB does 25MB of IO: 64 // 1MB read from this level 65 // 10-12MB read from next level (boundaries may be misaligned) 66 // 10-12MB written to next level 67 // This implies that 25 seeks cost the same as the compaction 68 // of 1MB of data. I.e., one seek costs approximately the 69 // same as the compaction of 40KB of data. We are a little 70 // conservative and allow approximately one seek for every 16KB 71 // of data before triggering a compaction. 72 f.seekLeft = int32(size / 16384) 73 if f.seekLeft < 100 { 74 f.seekLeft = 100 75 } 76 77 return f 78} 79 80func tableFileFromRecord(r atRecord) *tFile { 81 return newTableFile(storage.FileDesc{Type: storage.TypeTable, Num: r.num}, r.size, r.imin, r.imax) 82} 83 84// tFiles hold multiple tFile. 85type tFiles []*tFile 86 87func (tf tFiles) Len() int { return len(tf) } 88func (tf tFiles) Swap(i, j int) { tf[i], tf[j] = tf[j], tf[i] } 89 90func (tf tFiles) nums() string { 91 x := "[ " 92 for i, f := range tf { 93 if i != 0 { 94 x += ", " 95 } 96 x += fmt.Sprint(f.fd.Num) 97 } 98 x += " ]" 99 return x 100} 101 102// Returns true if i smallest key is less than j. 103// This used for sort by key in ascending order. 104func (tf tFiles) lessByKey(icmp *iComparer, i, j int) bool { 105 a, b := tf[i], tf[j] 106 n := icmp.Compare(a.imin, b.imin) 107 if n == 0 { 108 return a.fd.Num < b.fd.Num 109 } 110 return n < 0 111} 112 113// Returns true if i file number is greater than j. 114// This used for sort by file number in descending order. 115func (tf tFiles) lessByNum(i, j int) bool { 116 return tf[i].fd.Num > tf[j].fd.Num 117} 118 119// Sorts tables by key in ascending order. 120func (tf tFiles) sortByKey(icmp *iComparer) { 121 sort.Sort(&tFilesSortByKey{tFiles: tf, icmp: icmp}) 122} 123 124// Sorts tables by file number in descending order. 125func (tf tFiles) sortByNum() { 126 sort.Sort(&tFilesSortByNum{tFiles: tf}) 127} 128 129// Returns sum of all tables size. 130func (tf tFiles) size() (sum int64) { 131 for _, t := range tf { 132 sum += t.size 133 } 134 return sum 135} 136 137// Searches smallest index of tables whose its smallest 138// key is after or equal with given key. 139func (tf tFiles) searchMin(icmp *iComparer, ikey internalKey) int { 140 return sort.Search(len(tf), func(i int) bool { 141 return icmp.Compare(tf[i].imin, ikey) >= 0 142 }) 143} 144 145// Searches smallest index of tables whose its largest 146// key is after or equal with given key. 147func (tf tFiles) searchMax(icmp *iComparer, ikey internalKey) int { 148 return sort.Search(len(tf), func(i int) bool { 149 return icmp.Compare(tf[i].imax, ikey) >= 0 150 }) 151} 152 153// Returns true if given key range overlaps with one or more 154// tables key range. If unsorted is true then binary search will not be used. 155func (tf tFiles) overlaps(icmp *iComparer, umin, umax []byte, unsorted bool) bool { 156 if unsorted { 157 // Check against all files. 158 for _, t := range tf { 159 if t.overlaps(icmp, umin, umax) { 160 return true 161 } 162 } 163 return false 164 } 165 166 i := 0 167 if len(umin) > 0 { 168 // Find the earliest possible internal key for min. 169 i = tf.searchMax(icmp, makeInternalKey(nil, umin, keyMaxSeq, keyTypeSeek)) 170 } 171 if i >= len(tf) { 172 // Beginning of range is after all files, so no overlap. 173 return false 174 } 175 return !tf[i].before(icmp, umax) 176} 177 178// Returns tables whose its key range overlaps with given key range. 179// Range will be expanded if ukey found hop across tables. 180// If overlapped is true then the search will be restarted if umax 181// expanded. 182// The dst content will be overwritten. 183func (tf tFiles) getOverlaps(dst tFiles, icmp *iComparer, umin, umax []byte, overlapped bool) tFiles { 184 dst = dst[:0] 185 for i := 0; i < len(tf); { 186 t := tf[i] 187 if t.overlaps(icmp, umin, umax) { 188 if umin != nil && icmp.uCompare(t.imin.ukey(), umin) < 0 { 189 umin = t.imin.ukey() 190 dst = dst[:0] 191 i = 0 192 continue 193 } else if umax != nil && icmp.uCompare(t.imax.ukey(), umax) > 0 { 194 umax = t.imax.ukey() 195 // Restart search if it is overlapped. 196 if overlapped { 197 dst = dst[:0] 198 i = 0 199 continue 200 } 201 } 202 203 dst = append(dst, t) 204 } 205 i++ 206 } 207 208 return dst 209} 210 211// Returns tables key range. 212func (tf tFiles) getRange(icmp *iComparer) (imin, imax internalKey) { 213 for i, t := range tf { 214 if i == 0 { 215 imin, imax = t.imin, t.imax 216 continue 217 } 218 if icmp.Compare(t.imin, imin) < 0 { 219 imin = t.imin 220 } 221 if icmp.Compare(t.imax, imax) > 0 { 222 imax = t.imax 223 } 224 } 225 226 return 227} 228 229// Creates iterator index from tables. 230func (tf tFiles) newIndexIterator(tops *tOps, icmp *iComparer, slice *util.Range, ro *opt.ReadOptions) iterator.IteratorIndexer { 231 if slice != nil { 232 var start, limit int 233 if slice.Start != nil { 234 start = tf.searchMax(icmp, internalKey(slice.Start)) 235 } 236 if slice.Limit != nil { 237 limit = tf.searchMin(icmp, internalKey(slice.Limit)) 238 } else { 239 limit = tf.Len() 240 } 241 tf = tf[start:limit] 242 } 243 return iterator.NewArrayIndexer(&tFilesArrayIndexer{ 244 tFiles: tf, 245 tops: tops, 246 icmp: icmp, 247 slice: slice, 248 ro: ro, 249 }) 250} 251 252// Tables iterator index. 253type tFilesArrayIndexer struct { 254 tFiles 255 tops *tOps 256 icmp *iComparer 257 slice *util.Range 258 ro *opt.ReadOptions 259} 260 261func (a *tFilesArrayIndexer) Search(key []byte) int { 262 return a.searchMax(a.icmp, internalKey(key)) 263} 264 265func (a *tFilesArrayIndexer) Get(i int) iterator.Iterator { 266 if i == 0 || i == a.Len()-1 { 267 return a.tops.newIterator(a.tFiles[i], a.slice, a.ro) 268 } 269 return a.tops.newIterator(a.tFiles[i], nil, a.ro) 270} 271 272// Helper type for sortByKey. 273type tFilesSortByKey struct { 274 tFiles 275 icmp *iComparer 276} 277 278func (x *tFilesSortByKey) Less(i, j int) bool { 279 return x.lessByKey(x.icmp, i, j) 280} 281 282// Helper type for sortByNum. 283type tFilesSortByNum struct { 284 tFiles 285} 286 287func (x *tFilesSortByNum) Less(i, j int) bool { 288 return x.lessByNum(i, j) 289} 290 291// Table operations. 292type tOps struct { 293 s *session 294 noSync bool 295 evictRemoved bool 296 cache *cache.Cache 297 bcache *cache.Cache 298 bpool *util.BufferPool 299} 300 301// Creates an empty table and returns table writer. 302func (t *tOps) create() (*tWriter, error) { 303 fd := storage.FileDesc{Type: storage.TypeTable, Num: t.s.allocFileNum()} 304 fw, err := t.s.stor.Create(fd) 305 if err != nil { 306 return nil, err 307 } 308 return &tWriter{ 309 t: t, 310 fd: fd, 311 w: fw, 312 tw: table.NewWriter(fw, t.s.o.Options), 313 }, nil 314} 315 316// Builds table from src iterator. 317func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) { 318 w, err := t.create() 319 if err != nil { 320 return 321 } 322 323 defer func() { 324 if err != nil { 325 w.drop() 326 } 327 }() 328 329 for src.Next() { 330 err = w.append(src.Key(), src.Value()) 331 if err != nil { 332 return 333 } 334 } 335 err = src.Error() 336 if err != nil { 337 return 338 } 339 340 n = w.tw.EntriesLen() 341 f, err = w.finish() 342 return 343} 344 345// Opens table. It returns a cache handle, which should 346// be released after use. 347func (t *tOps) open(f *tFile) (ch *cache.Handle, err error) { 348 ch = t.cache.Get(0, uint64(f.fd.Num), func() (size int, value cache.Value) { 349 var r storage.Reader 350 r, err = t.s.stor.Open(f.fd) 351 if err != nil { 352 return 0, nil 353 } 354 355 var bcache *cache.NamespaceGetter 356 if t.bcache != nil { 357 bcache = &cache.NamespaceGetter{Cache: t.bcache, NS: uint64(f.fd.Num)} 358 } 359 360 var tr *table.Reader 361 tr, err = table.NewReader(r, f.size, f.fd, bcache, t.bpool, t.s.o.Options) 362 if err != nil { 363 r.Close() 364 return 0, nil 365 } 366 return 1, tr 367 368 }) 369 if ch == nil && err == nil { 370 err = ErrClosed 371 } 372 return 373} 374 375// Finds key/value pair whose key is greater than or equal to the 376// given key. 377func (t *tOps) find(f *tFile, key []byte, ro *opt.ReadOptions) (rkey, rvalue []byte, err error) { 378 ch, err := t.open(f) 379 if err != nil { 380 return nil, nil, err 381 } 382 defer ch.Release() 383 return ch.Value().(*table.Reader).Find(key, true, ro) 384} 385 386// Finds key that is greater than or equal to the given key. 387func (t *tOps) findKey(f *tFile, key []byte, ro *opt.ReadOptions) (rkey []byte, err error) { 388 ch, err := t.open(f) 389 if err != nil { 390 return nil, err 391 } 392 defer ch.Release() 393 return ch.Value().(*table.Reader).FindKey(key, true, ro) 394} 395 396// Returns approximate offset of the given key. 397func (t *tOps) offsetOf(f *tFile, key []byte) (offset int64, err error) { 398 ch, err := t.open(f) 399 if err != nil { 400 return 401 } 402 defer ch.Release() 403 return ch.Value().(*table.Reader).OffsetOf(key) 404} 405 406// Creates an iterator from the given table. 407func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { 408 ch, err := t.open(f) 409 if err != nil { 410 return iterator.NewEmptyIterator(err) 411 } 412 iter := ch.Value().(*table.Reader).NewIterator(slice, ro) 413 iter.SetReleaser(ch) 414 return iter 415} 416 417// Removes table from persistent storage. It waits until 418// no one use the the table. 419func (t *tOps) remove(f *tFile) { 420 t.cache.Delete(0, uint64(f.fd.Num), func() { 421 if err := t.s.stor.Remove(f.fd); err != nil { 422 t.s.logf("table@remove removing @%d %q", f.fd.Num, err) 423 } else { 424 t.s.logf("table@remove removed @%d", f.fd.Num) 425 } 426 if t.evictRemoved && t.bcache != nil { 427 t.bcache.EvictNS(uint64(f.fd.Num)) 428 } 429 }) 430} 431 432// Closes the table ops instance. It will close all tables, 433// regadless still used or not. 434func (t *tOps) close() { 435 t.bpool.Close() 436 t.cache.Close() 437 if t.bcache != nil { 438 t.bcache.CloseWeak() 439 } 440} 441 442// Creates new initialized table ops instance. 443func newTableOps(s *session) *tOps { 444 var ( 445 cacher cache.Cacher 446 bcache *cache.Cache 447 bpool *util.BufferPool 448 ) 449 if s.o.GetOpenFilesCacheCapacity() > 0 { 450 cacher = cache.NewLRU(s.o.GetOpenFilesCacheCapacity()) 451 } 452 if !s.o.GetDisableBlockCache() { 453 var bcacher cache.Cacher 454 if s.o.GetBlockCacheCapacity() > 0 { 455 bcacher = s.o.GetBlockCacher().New(s.o.GetBlockCacheCapacity()) 456 } 457 bcache = cache.NewCache(bcacher) 458 } 459 if !s.o.GetDisableBufferPool() { 460 bpool = util.NewBufferPool(s.o.GetBlockSize() + 5) 461 } 462 return &tOps{ 463 s: s, 464 noSync: s.o.GetNoSync(), 465 evictRemoved: s.o.GetBlockCacheEvictRemoved(), 466 cache: cache.NewCache(cacher), 467 bcache: bcache, 468 bpool: bpool, 469 } 470} 471 472// tWriter wraps the table writer. It keep track of file descriptor 473// and added key range. 474type tWriter struct { 475 t *tOps 476 477 fd storage.FileDesc 478 w storage.Writer 479 tw *table.Writer 480 481 first, last []byte 482} 483 484// Append key/value pair to the table. 485func (w *tWriter) append(key, value []byte) error { 486 if w.first == nil { 487 w.first = append([]byte{}, key...) 488 } 489 w.last = append(w.last[:0], key...) 490 return w.tw.Append(key, value) 491} 492 493// Returns true if the table is empty. 494func (w *tWriter) empty() bool { 495 return w.first == nil 496} 497 498// Closes the storage.Writer. 499func (w *tWriter) close() { 500 if w.w != nil { 501 w.w.Close() 502 w.w = nil 503 } 504} 505 506// Finalizes the table and returns table file. 507func (w *tWriter) finish() (f *tFile, err error) { 508 defer w.close() 509 err = w.tw.Close() 510 if err != nil { 511 return 512 } 513 if !w.t.noSync { 514 err = w.w.Sync() 515 if err != nil { 516 return 517 } 518 } 519 f = newTableFile(w.fd, int64(w.tw.BytesLen()), internalKey(w.first), internalKey(w.last)) 520 return 521} 522 523// Drops the table. 524func (w *tWriter) drop() { 525 w.close() 526 w.t.s.stor.Remove(w.fd) 527 w.t.s.reuseFileNum(w.fd.Num) 528 w.tw = nil 529 w.first = nil 530 w.last = nil 531} 532