1package leveldb 2 3import ( 4 "encoding/binary" 5 "math/rand" 6 "reflect" 7 "sync" 8 "testing" 9 "time" 10 11 "github.com/onsi/gomega" 12 "github.com/syndtr/goleveldb/leveldb/storage" 13 "github.com/syndtr/goleveldb/leveldb/testutil" 14) 15 16type testFileRec struct { 17 level int 18 num int64 19} 20 21func TestVersionStaging(t *testing.T) { 22 gomega.RegisterTestingT(t) 23 stor := testutil.NewStorage() 24 defer stor.Close() 25 s, err := newSession(stor, nil) 26 if err != nil { 27 t.Fatal(err) 28 } 29 defer func() { 30 s.close() 31 s.release() 32 }() 33 34 v := newVersion(s) 35 v.newStaging() 36 37 tmp := make([]byte, 4) 38 mik := func(i uint64) []byte { 39 binary.BigEndian.PutUint32(tmp, uint32(i)) 40 return []byte(makeInternalKey(nil, tmp, 0, keyTypeVal)) 41 } 42 43 for i, x := range []struct { 44 add, del []testFileRec 45 trivial bool 46 levels [][]int64 47 }{ 48 { 49 add: []testFileRec{ 50 {1, 1}, 51 }, 52 levels: [][]int64{ 53 {}, 54 {1}, 55 }, 56 }, 57 { 58 add: []testFileRec{ 59 {1, 1}, 60 }, 61 levels: [][]int64{ 62 {}, 63 {1}, 64 }, 65 }, 66 { 67 del: []testFileRec{ 68 {1, 1}, 69 }, 70 levels: [][]int64{}, 71 }, 72 { 73 add: []testFileRec{ 74 {0, 1}, 75 {0, 3}, 76 {0, 2}, 77 {2, 5}, 78 {1, 4}, 79 }, 80 levels: [][]int64{ 81 {3, 2, 1}, 82 {4}, 83 {5}, 84 }, 85 }, 86 { 87 add: []testFileRec{ 88 {1, 6}, 89 {2, 5}, 90 }, 91 del: []testFileRec{ 92 {0, 1}, 93 {0, 4}, 94 }, 95 levels: [][]int64{ 96 {3, 2}, 97 {4, 6}, 98 {5}, 99 }, 100 }, 101 { 102 del: []testFileRec{ 103 {0, 3}, 104 {0, 2}, 105 {1, 4}, 106 {1, 6}, 107 {2, 5}, 108 }, 109 levels: [][]int64{}, 110 }, 111 { 112 add: []testFileRec{ 113 {0, 1}, 114 }, 115 levels: [][]int64{ 116 {1}, 117 }, 118 }, 119 { 120 add: []testFileRec{ 121 {1, 2}, 122 }, 123 levels: [][]int64{ 124 {1}, 125 {2}, 126 }, 127 }, 128 { 129 add: []testFileRec{ 130 {0, 3}, 131 }, 132 levels: [][]int64{ 133 {3, 1}, 134 {2}, 135 }, 136 }, 137 { 138 add: []testFileRec{ 139 {6, 9}, 140 }, 141 levels: [][]int64{ 142 {3, 1}, 143 {2}, 144 {}, 145 {}, 146 {}, 147 {}, 148 {9}, 149 }, 150 }, 151 { 152 del: []testFileRec{ 153 {6, 9}, 154 }, 155 levels: [][]int64{ 156 {3, 1}, 157 {2}, 158 }, 159 }, 160 // memory compaction 161 { 162 add: []testFileRec{ 163 {0, 5}, 164 }, 165 trivial: true, 166 levels: [][]int64{ 167 {5, 3, 1}, 168 {2}, 169 }, 170 }, 171 // memory compaction 172 { 173 add: []testFileRec{ 174 {0, 4}, 175 }, 176 trivial: true, 177 levels: [][]int64{ 178 {5, 4, 3, 1}, 179 {2}, 180 }, 181 }, 182 // table compaction 183 { 184 add: []testFileRec{ 185 {1, 6}, 186 {1, 7}, 187 {1, 8}, 188 }, 189 del: []testFileRec{ 190 {0, 3}, 191 {0, 4}, 192 {0, 5}, 193 }, 194 trivial: true, 195 levels: [][]int64{ 196 {1}, 197 {2, 6, 7, 8}, 198 }, 199 }, 200 } { 201 rec := &sessionRecord{} 202 for _, f := range x.add { 203 ik := mik(uint64(f.num)) 204 rec.addTable(f.level, f.num, 1, ik, ik) 205 } 206 for _, f := range x.del { 207 rec.delTable(f.level, f.num) 208 } 209 vs := v.newStaging() 210 vs.commit(rec) 211 v = vs.finish(x.trivial) 212 if len(v.levels) != len(x.levels) { 213 t.Fatalf("#%d: invalid level count: want=%d got=%d", i, len(x.levels), len(v.levels)) 214 } 215 for j, want := range x.levels { 216 tables := v.levels[j] 217 if len(want) != len(tables) { 218 t.Fatalf("#%d.%d: invalid tables count: want=%d got=%d", i, j, len(want), len(tables)) 219 } 220 got := make([]int64, len(tables)) 221 for k, t := range tables { 222 got[k] = t.fd.Num 223 } 224 if !reflect.DeepEqual(want, got) { 225 t.Fatalf("#%d.%d: invalid tables: want=%v got=%v", i, j, want, got) 226 } 227 } 228 } 229} 230 231func TestVersionReference(t *testing.T) { 232 gomega.RegisterTestingT(t) 233 stor := testutil.NewStorage() 234 defer stor.Close() 235 s, err := newSession(stor, nil) 236 if err != nil { 237 t.Fatal(err) 238 } 239 defer func() { 240 s.close() 241 s.release() 242 }() 243 244 tmp := make([]byte, 4) 245 mik := func(i uint64) []byte { 246 binary.BigEndian.PutUint32(tmp, uint32(i)) 247 return []byte(makeInternalKey(nil, tmp, 0, keyTypeVal)) 248 } 249 250 // Test normal version task correctness 251 refc := make(chan map[int64]int) 252 253 for i, x := range []struct { 254 add, del []testFileRec 255 expect map[int64]int 256 failed bool 257 }{ 258 { 259 []testFileRec{{0, 1}, {0, 2}}, 260 nil, 261 map[int64]int{1: 1, 2: 1}, 262 false, 263 }, 264 { 265 []testFileRec{{0, 3}, {0, 4}}, 266 []testFileRec{{0, 1}}, 267 map[int64]int{2: 1, 3: 1, 4: 1}, 268 false, 269 }, 270 { 271 []testFileRec{{0, 1}, {0, 5}, {0, 6}, {0, 7}}, 272 []testFileRec{{0, 2}, {0, 3}, {0, 4}}, 273 map[int64]int{1: 1, 5: 1, 6: 1, 7: 1}, 274 false, 275 }, 276 { 277 nil, 278 nil, 279 map[int64]int{1: 1, 5: 1, 6: 1, 7: 1}, 280 true, 281 }, 282 { 283 []testFileRec{{0, 1}, {0, 5}, {0, 6}, {0, 7}}, 284 nil, 285 map[int64]int{1: 2, 5: 2, 6: 2, 7: 2}, 286 false, 287 }, 288 { 289 nil, 290 []testFileRec{{0, 1}, {0, 5}, {0, 6}, {0, 7}}, 291 map[int64]int{1: 1, 5: 1, 6: 1, 7: 1}, 292 false, 293 }, 294 { 295 []testFileRec{{0, 0}}, 296 []testFileRec{{0, 1}, {0, 5}, {0, 6}, {0, 7}}, 297 map[int64]int{0: 1}, 298 false, 299 }, 300 } { 301 rec := &sessionRecord{} 302 for n, f := range x.add { 303 rec.addTable(f.level, f.num, 1, mik(uint64(i+n)), mik(uint64(i+n))) 304 } 305 for _, f := range x.del { 306 rec.delTable(f.level, f.num) 307 } 308 309 // Simulate some read operations 310 var wg sync.WaitGroup 311 readN := rand.Intn(300) 312 for i := 0; i < readN; i++ { 313 wg.Add(1) 314 go func() { 315 v := s.version() 316 time.Sleep(time.Millisecond * time.Duration(rand.Intn(300))) 317 v.release() 318 wg.Done() 319 }() 320 } 321 322 v := s.version() 323 vs := v.newStaging() 324 vs.commit(rec) 325 nv := vs.finish(false) 326 327 if x.failed { 328 s.abandon <- nv.id 329 } else { 330 s.setVersion(rec, nv) 331 } 332 v.release() 333 334 // Wait all read operations 335 wg.Wait() 336 337 time.Sleep(100 * time.Millisecond) // Wait lazy reference finish tasks 338 339 s.fileRefCh <- refc 340 ref := <-refc 341 if !reflect.DeepEqual(ref, x.expect) { 342 t.Errorf("case %d failed, file reference mismatch, GOT %v, WANT %v", i, ref, x.expect) 343 } 344 } 345 346 // Test version task overflow 347 var longV = s.version() // This version is held by some long-time operation 348 var exp = map[int64]int{0: 1, maxCachedNumber: 1} 349 for i := 1; i <= maxCachedNumber; i++ { 350 rec := &sessionRecord{} 351 rec.addTable(0, int64(i), 1, mik(uint64(i)), mik(uint64(i))) 352 rec.delTable(0, int64(i-1)) 353 v := s.version() 354 vs := v.newStaging() 355 vs.commit(rec) 356 nv := vs.finish(false) 357 s.setVersion(rec, nv) 358 v.release() 359 } 360 time.Sleep(100 * time.Millisecond) // Wait lazy reference finish tasks 361 362 s.fileRefCh <- refc 363 ref := <-refc 364 if !reflect.DeepEqual(exp, ref) { 365 t.Errorf("file reference mismatch, GOT %v, WANT %v", ref, exp) 366 } 367 368 longV.release() 369 s.fileRefCh <- refc 370 ref = <-refc 371 delete(exp, 0) 372 if !reflect.DeepEqual(exp, ref) { 373 t.Errorf("file reference mismatch, GOT %v, WANT %v", ref, exp) 374 } 375} 376 377func BenchmarkVersionStagingNonTrivial(b *testing.B) { 378 benchmarkVersionStaging(b, false, 100000) 379} 380 381func BenchmarkVersionStagingTrivial(b *testing.B) { 382 benchmarkVersionStaging(b, true, 100000) 383} 384 385func benchmarkVersionStaging(b *testing.B, trivial bool, size int) { 386 stor := storage.NewMemStorage() 387 defer stor.Close() 388 s, err := newSession(stor, nil) 389 if err != nil { 390 b.Fatal(err) 391 } 392 defer func() { 393 s.close() 394 s.release() 395 }() 396 397 tmp := make([]byte, 4) 398 mik := func(i uint64) []byte { 399 binary.BigEndian.PutUint32(tmp, uint32(i)) 400 return []byte(makeInternalKey(nil, tmp, 0, keyTypeVal)) 401 } 402 403 rec := &sessionRecord{} 404 for i := 0; i < size; i++ { 405 ik := mik(uint64(i)) 406 rec.addTable(1, int64(i), 1, ik, ik) 407 } 408 409 v := newVersion(s) 410 vs := v.newStaging() 411 vs.commit(rec) 412 v = vs.finish(false) 413 414 b.ResetTimer() 415 b.ReportAllocs() 416 417 for i := 0; i < b.N; i++ { 418 rec := &sessionRecord{} 419 index := rand.Intn(size) 420 ik := mik(uint64(index)) 421 422 cnt := 0 423 for j := index; j < size && cnt <= 3; j++ { 424 rec.addTable(1, int64(i), 1, ik, ik) 425 cnt += 1 426 } 427 vs := v.newStaging() 428 vs.commit(rec) 429 vs.finish(trivial) 430 } 431} 432