1// Copyright (c) The Thanos Authors. 2// Licensed under the Apache License 2.0. 3 4package indexheader 5 6import ( 7 "context" 8 "fmt" 9 "io/ioutil" 10 "math" 11 "os" 12 "path/filepath" 13 "strconv" 14 "testing" 15 16 "github.com/go-kit/kit/log" 17 "github.com/oklog/ulid" 18 "github.com/pkg/errors" 19 "github.com/prometheus/prometheus/pkg/labels" 20 "github.com/prometheus/prometheus/tsdb/encoding" 21 "github.com/prometheus/prometheus/tsdb/fileutil" 22 "github.com/prometheus/prometheus/tsdb/index" 23 24 "github.com/thanos-io/thanos/pkg/block" 25 "github.com/thanos-io/thanos/pkg/block/metadata" 26 "github.com/thanos-io/thanos/pkg/objstore" 27 "github.com/thanos-io/thanos/pkg/objstore/filesystem" 28 "github.com/thanos-io/thanos/pkg/testutil" 29 "github.com/thanos-io/thanos/pkg/testutil/e2eutil" 30) 31 32func TestReaders(t *testing.T) { 33 ctx := context.Background() 34 35 tmpDir, err := ioutil.TempDir("", "test-indexheader") 36 testutil.Ok(t, err) 37 defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() 38 39 bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) 40 testutil.Ok(t, err) 41 defer func() { testutil.Ok(t, bkt.Close()) }() 42 43 // Create block index version 2. 44 id1, err := e2eutil.CreateBlock(ctx, tmpDir, []labels.Labels{ 45 {{Name: "a", Value: "1"}}, 46 {{Name: "a", Value: "2"}}, 47 {{Name: "a", Value: "3"}}, 48 {{Name: "a", Value: "4"}}, 49 {{Name: "a", Value: "5"}}, 50 {{Name: "a", Value: "6"}}, 51 {{Name: "a", Value: "7"}}, 52 {{Name: "a", Value: "8"}}, 53 {{Name: "a", Value: "9"}}, 54 // Missing 10 on purpose. 55 {{Name: "a", Value: "11"}}, 56 {{Name: "a", Value: "12"}}, 57 {{Name: "a", Value: "13"}}, 58 {{Name: "a", Value: "1"}, {Name: "longer-string", Value: "1"}}, 59 {{Name: "a", Value: "1"}, {Name: "longer-string", Value: "2"}}, 60 }, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124, metadata.NoneFunc) 61 testutil.Ok(t, err) 62 63 testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, id1.String()), metadata.NoneFunc)) 64 65 // Copy block index version 1 for backward compatibility. 66 /* The block here was produced at the commit 67 706602daed1487f7849990678b4ece4599745905 used in 2.0.0 with: 68 db, _ := Open("v1db", nil, nil, nil) 69 app := db.Appender() 70 app.Add(labels.FromStrings("foo", "bar"), 1, 2) 71 app.Add(labels.FromStrings("foo", "baz"), 3, 4) 72 app.Add(labels.FromStrings("foo", "meh"), 1000*3600*4, 4) // Not in the block. 73 // Make sure we've enough values for the lack of sorting of postings offsets to show up. 74 for i := 0; i < 100; i++ { 75 app.Add(labels.FromStrings("bar", strconv.FormatInt(int64(i), 10)), 0, 0) 76 } 77 app.Commit() 78 db.compact() 79 db.Close() 80 */ 81 82 m, err := metadata.ReadFromDir("./testdata/index_format_v1") 83 testutil.Ok(t, err) 84 e2eutil.Copy(t, "./testdata/index_format_v1", filepath.Join(tmpDir, m.ULID.String())) 85 86 _, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(tmpDir, m.ULID.String()), metadata.Thanos{ 87 Labels: labels.Labels{{Name: "ext1", Value: "1"}}.Map(), 88 Downsample: metadata.ThanosDownsample{Resolution: 0}, 89 Source: metadata.TestSource, 90 }, &m.BlockMeta) 91 testutil.Ok(t, err) 92 testutil.Ok(t, block.Upload(ctx, log.NewNopLogger(), bkt, filepath.Join(tmpDir, m.ULID.String()), metadata.NoneFunc)) 93 94 for _, id := range []ulid.ULID{id1, m.ULID} { 95 t.Run(id.String(), func(t *testing.T) { 96 indexFile, err := fileutil.OpenMmapFile(filepath.Join(tmpDir, id.String(), block.IndexFilename)) 97 testutil.Ok(t, err) 98 defer func() { _ = indexFile.Close() }() 99 100 b := realByteSlice(indexFile.Bytes()) 101 102 t.Run("binary reader", func(t *testing.T) { 103 fn := filepath.Join(tmpDir, id.String(), block.IndexHeaderFilename) 104 testutil.Ok(t, WriteBinary(ctx, bkt, id, fn)) 105 106 br, err := NewBinaryReader(ctx, log.NewNopLogger(), nil, tmpDir, id, 3) 107 testutil.Ok(t, err) 108 109 defer func() { testutil.Ok(t, br.Close()) }() 110 111 if id == id1 { 112 testutil.Equals(t, 1, br.version) 113 testutil.Equals(t, 2, br.indexVersion) 114 testutil.Equals(t, &BinaryTOC{Symbols: headerLen, PostingsOffsetTable: 69}, br.toc) 115 testutil.Equals(t, int64(710), br.indexLastPostingEnd) 116 testutil.Equals(t, 8, br.symbols.Size()) 117 testutil.Equals(t, 0, len(br.postingsV1)) 118 testutil.Equals(t, 2, len(br.nameSymbols)) 119 testutil.Equals(t, map[string]*postingValueOffsets{ 120 "": { 121 offsets: []postingOffset{{value: "", tableOff: 4}}, 122 lastValOffset: 440, 123 }, 124 "a": { 125 offsets: []postingOffset{ 126 {value: "1", tableOff: 9}, 127 {value: "13", tableOff: 32}, 128 {value: "4", tableOff: 54}, 129 {value: "7", tableOff: 75}, 130 {value: "9", tableOff: 89}, 131 }, 132 lastValOffset: 640, 133 }, 134 "longer-string": { 135 offsets: []postingOffset{ 136 {value: "1", tableOff: 96}, 137 {value: "2", tableOff: 115}, 138 }, 139 lastValOffset: 706, 140 }, 141 }, br.postings) 142 143 vals, err := br.LabelValues("not-existing") 144 testutil.Ok(t, err) 145 testutil.Equals(t, []string(nil), vals) 146 147 // Regression tests for https://github.com/thanos-io/thanos/issues/2213. 148 // Most of not existing value was working despite bug, except in certain unlucky cases 149 // it was causing "invalid size" errors. 150 _, err = br.PostingsOffset("not-existing", "1") 151 testutil.Equals(t, NotFoundRangeErr, err) 152 _, err = br.PostingsOffset("a", "0") 153 testutil.Equals(t, NotFoundRangeErr, err) 154 // Unlucky case, because the bug was causing unnecessary read & decode requiring more bytes than 155 // available. For rest cases read was noop wrong, but at least not failing. 156 _, err = br.PostingsOffset("a", "10") 157 testutil.Equals(t, NotFoundRangeErr, err) 158 _, err = br.PostingsOffset("a", "121") 159 testutil.Equals(t, NotFoundRangeErr, err) 160 _, err = br.PostingsOffset("a", "131") 161 testutil.Equals(t, NotFoundRangeErr, err) 162 _, err = br.PostingsOffset("a", "91") 163 testutil.Equals(t, NotFoundRangeErr, err) 164 _, err = br.PostingsOffset("longer-string", "0") 165 testutil.Equals(t, NotFoundRangeErr, err) 166 _, err = br.PostingsOffset("longer-string", "11") 167 testutil.Equals(t, NotFoundRangeErr, err) 168 _, err = br.PostingsOffset("longer-string", "21") 169 testutil.Equals(t, NotFoundRangeErr, err) 170 } 171 172 compareIndexToHeader(t, b, br) 173 }) 174 175 t.Run("lazy binary reader", func(t *testing.T) { 176 fn := filepath.Join(tmpDir, id.String(), block.IndexHeaderFilename) 177 testutil.Ok(t, WriteBinary(ctx, bkt, id, fn)) 178 179 br, err := NewLazyBinaryReader(ctx, log.NewNopLogger(), nil, tmpDir, id, 3, NewLazyBinaryReaderMetrics(nil), nil) 180 testutil.Ok(t, err) 181 182 defer func() { testutil.Ok(t, br.Close()) }() 183 184 compareIndexToHeader(t, b, br) 185 }) 186 }) 187 } 188 189} 190 191func compareIndexToHeader(t *testing.T, indexByteSlice index.ByteSlice, headerReader Reader) { 192 indexReader, err := index.NewReader(indexByteSlice) 193 testutil.Ok(t, err) 194 defer func() { _ = indexReader.Close() }() 195 196 actVersion, err := headerReader.IndexVersion() 197 testutil.Ok(t, err) 198 testutil.Equals(t, indexReader.Version(), actVersion) 199 200 if indexReader.Version() == index.FormatV2 { 201 // For v2 symbols ref sequential integers 0, 1, 2 etc. 202 iter := indexReader.Symbols() 203 i := 0 204 for iter.Next() { 205 r, err := headerReader.LookupSymbol(uint32(i)) 206 testutil.Ok(t, err) 207 testutil.Equals(t, iter.At(), r) 208 209 i++ 210 } 211 testutil.Ok(t, iter.Err()) 212 _, err := headerReader.LookupSymbol(uint32(i)) 213 testutil.NotOk(t, err) 214 215 } else { 216 // For v1 symbols refs are actual offsets in the index. 217 symbols, err := getSymbolTable(indexByteSlice) 218 testutil.Ok(t, err) 219 220 for refs, sym := range symbols { 221 r, err := headerReader.LookupSymbol(refs) 222 testutil.Ok(t, err) 223 testutil.Equals(t, sym, r) 224 } 225 _, err = headerReader.LookupSymbol(200000) 226 testutil.NotOk(t, err) 227 } 228 229 expLabelNames, err := indexReader.LabelNames() 230 testutil.Ok(t, err) 231 actualLabelNames, err := headerReader.LabelNames() 232 testutil.Ok(t, err) 233 testutil.Equals(t, expLabelNames, actualLabelNames) 234 235 expRanges, err := indexReader.PostingsRanges() 236 testutil.Ok(t, err) 237 238 minStart := int64(math.MaxInt64) 239 maxEnd := int64(math.MinInt64) 240 for il, lname := range expLabelNames { 241 expectedLabelVals, err := indexReader.SortedLabelValues(lname) 242 testutil.Ok(t, err) 243 244 vals, err := headerReader.LabelValues(lname) 245 testutil.Ok(t, err) 246 testutil.Equals(t, expectedLabelVals, vals) 247 248 for iv, v := range vals { 249 if minStart > expRanges[labels.Label{Name: lname, Value: v}].Start { 250 minStart = expRanges[labels.Label{Name: lname, Value: v}].Start 251 } 252 if maxEnd < expRanges[labels.Label{Name: lname, Value: v}].End { 253 maxEnd = expRanges[labels.Label{Name: lname, Value: v}].End 254 } 255 256 ptr, err := headerReader.PostingsOffset(lname, v) 257 testutil.Ok(t, err) 258 259 // For index-cache those values are exact. 260 // 261 // For binary they are exact except last item posting offset. It's good enough if the value is larger than exact posting ending. 262 if indexReader.Version() == index.FormatV2 { 263 if iv == len(vals)-1 && il == len(expLabelNames)-1 { 264 testutil.Equals(t, expRanges[labels.Label{Name: lname, Value: v}].Start, ptr.Start) 265 testutil.Assert(t, expRanges[labels.Label{Name: lname, Value: v}].End <= ptr.End, "got offset %v earlier than actual posting end %v ", ptr.End, expRanges[labels.Label{Name: lname, Value: v}].End) 266 continue 267 } 268 } else { 269 // For index formatV1 the last one does not mean literally last value, as postings were not sorted. 270 // Account for that. We know it's 40 label value. 271 if v == "40" { 272 testutil.Equals(t, expRanges[labels.Label{Name: lname, Value: v}].Start, ptr.Start) 273 testutil.Assert(t, expRanges[labels.Label{Name: lname, Value: v}].End <= ptr.End, "got offset %v earlier than actual posting end %v ", ptr.End, expRanges[labels.Label{Name: lname, Value: v}].End) 274 continue 275 } 276 } 277 testutil.Equals(t, expRanges[labels.Label{Name: lname, Value: v}], ptr) 278 } 279 } 280 281 ptr, err := headerReader.PostingsOffset(index.AllPostingsKey()) 282 testutil.Ok(t, err) 283 testutil.Equals(t, expRanges[labels.Label{Name: "", Value: ""}].Start, ptr.Start) 284 testutil.Equals(t, expRanges[labels.Label{Name: "", Value: ""}].End, ptr.End) 285} 286 287func prepareIndexV2Block(t testing.TB, tmpDir string, bkt objstore.Bucket) *metadata.Meta { 288 /* Copy index 6MB block index version 2. It was generated via thanosbench. Meta.json: 289 { 290 "ulid": "01DRBP4RNVZ94135ZA6B10EMRR", 291 "minTime": 1570766415000, 292 "maxTime": 1570939215001, 293 "stats": { 294 "numSamples": 115210000, 295 "numSeries": 10000, 296 "numChunks": 990000 297 }, 298 "compaction": { 299 "level": 1, 300 "sources": [ 301 "01DRBP4RNVZ94135ZA6B10EMRR" 302 ] 303 }, 304 "version": 1, 305 "thanos": { 306 "labels": { 307 "cluster": "one", 308 "dataset": "continuous" 309 }, 310 "downsample": { 311 "resolution": 0 312 }, 313 "source": "blockgen" 314 } 315 } 316 */ 317 318 m, err := metadata.ReadFromDir("./testdata/index_format_v2") 319 testutil.Ok(t, err) 320 e2eutil.Copy(t, "./testdata/index_format_v2", filepath.Join(tmpDir, m.ULID.String())) 321 322 _, err = metadata.InjectThanos(log.NewNopLogger(), filepath.Join(tmpDir, m.ULID.String()), metadata.Thanos{ 323 Labels: labels.Labels{{Name: "ext1", Value: "1"}}.Map(), 324 Downsample: metadata.ThanosDownsample{Resolution: 0}, 325 Source: metadata.TestSource, 326 }, &m.BlockMeta) 327 testutil.Ok(t, err) 328 testutil.Ok(t, block.Upload(context.Background(), log.NewNopLogger(), bkt, filepath.Join(tmpDir, m.ULID.String()), metadata.NoneFunc)) 329 330 return m 331} 332 333func BenchmarkBinaryWrite(t *testing.B) { 334 ctx := context.Background() 335 336 tmpDir, err := ioutil.TempDir("", "bench-indexheader") 337 testutil.Ok(t, err) 338 defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() 339 340 bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) 341 testutil.Ok(t, err) 342 defer func() { testutil.Ok(t, bkt.Close()) }() 343 344 m := prepareIndexV2Block(t, tmpDir, bkt) 345 fn := filepath.Join(tmpDir, m.ULID.String(), block.IndexHeaderFilename) 346 347 t.ResetTimer() 348 for i := 0; i < t.N; i++ { 349 testutil.Ok(t, WriteBinary(ctx, bkt, m.ULID, fn)) 350 } 351} 352 353func BenchmarkBinaryReader(t *testing.B) { 354 ctx := context.Background() 355 tmpDir, err := ioutil.TempDir("", "bench-indexheader") 356 testutil.Ok(t, err) 357 defer func() { testutil.Ok(t, os.RemoveAll(tmpDir)) }() 358 359 bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) 360 testutil.Ok(t, err) 361 362 m := prepareIndexV2Block(t, tmpDir, bkt) 363 fn := filepath.Join(tmpDir, m.ULID.String(), block.IndexHeaderFilename) 364 testutil.Ok(t, WriteBinary(ctx, bkt, m.ULID, fn)) 365 366 t.ResetTimer() 367 for i := 0; i < t.N; i++ { 368 br, err := newFileBinaryReader(fn, 32) 369 testutil.Ok(t, err) 370 testutil.Ok(t, br.Close()) 371 } 372} 373 374func BenchmarkBinaryReader_LookupSymbol(b *testing.B) { 375 for _, numSeries := range []int{valueSymbolsCacheSize, valueSymbolsCacheSize * 10} { 376 b.Run(fmt.Sprintf("num series = %d", numSeries), func(b *testing.B) { 377 benchmarkBinaryReaderLookupSymbol(b, numSeries) 378 }) 379 } 380} 381 382func benchmarkBinaryReaderLookupSymbol(b *testing.B, numSeries int) { 383 const postingOffsetsInMemSampling = 32 384 385 ctx := context.Background() 386 logger := log.NewNopLogger() 387 388 tmpDir, err := ioutil.TempDir("", "benchmark-lookupsymbol") 389 testutil.Ok(b, err) 390 defer func() { testutil.Ok(b, os.RemoveAll(tmpDir)) }() 391 392 bkt, err := filesystem.NewBucket(filepath.Join(tmpDir, "bkt")) 393 testutil.Ok(b, err) 394 defer func() { testutil.Ok(b, bkt.Close()) }() 395 396 // Generate series labels. 397 seriesLabels := make([]labels.Labels, 0, numSeries) 398 for i := 0; i < numSeries; i++ { 399 seriesLabels = append(seriesLabels, labels.Labels{{Name: "a", Value: strconv.Itoa(i)}}) 400 } 401 402 // Create a block. 403 id1, err := e2eutil.CreateBlock(ctx, tmpDir, seriesLabels, 100, 0, 1000, labels.Labels{{Name: "ext1", Value: "1"}}, 124, metadata.NoneFunc) 404 testutil.Ok(b, err) 405 testutil.Ok(b, block.Upload(ctx, logger, bkt, filepath.Join(tmpDir, id1.String()), metadata.NoneFunc)) 406 407 // Create an index reader. 408 reader, err := NewBinaryReader(ctx, logger, bkt, tmpDir, id1, postingOffsetsInMemSampling) 409 testutil.Ok(b, err) 410 411 // Get the offset of each label value symbol. 412 symbolsOffsets := make([]uint32, numSeries) 413 for i := 0; i < numSeries; i++ { 414 o, err := reader.symbols.ReverseLookup(strconv.Itoa(i)) 415 testutil.Ok(b, err) 416 417 symbolsOffsets[i] = o 418 } 419 420 b.ResetTimer() 421 422 for n := 0; n < b.N; n++ { 423 for i := 0; i < len(symbolsOffsets); i++ { 424 if _, err := reader.LookupSymbol(symbolsOffsets[i]); err != nil { 425 b.Fail() 426 } 427 } 428 } 429} 430 431func getSymbolTable(b index.ByteSlice) (map[uint32]string, error) { 432 version := int(b.Range(4, 5)[0]) 433 434 if version != 1 && version != 2 { 435 return nil, errors.Errorf("unknown index file version %d", version) 436 } 437 438 toc, err := index.NewTOCFromByteSlice(b) 439 if err != nil { 440 return nil, errors.Wrap(err, "read TOC") 441 } 442 443 symbolsV2, symbolsV1, err := readSymbols(b, version, int(toc.Symbols)) 444 if err != nil { 445 return nil, errors.Wrap(err, "read symbols") 446 } 447 448 symbolsTable := make(map[uint32]string, len(symbolsV1)+len(symbolsV2)) 449 for o, s := range symbolsV1 { 450 symbolsTable[o] = s 451 } 452 for o, s := range symbolsV2 { 453 symbolsTable[uint32(o)] = s 454 } 455 return symbolsTable, nil 456} 457 458// readSymbols reads the symbol table fully into memory and allocates proper strings for them. 459// Strings backed by the mmap'd memory would cause memory faults if applications keep using them 460// after the reader is closed. 461func readSymbols(bs index.ByteSlice, version int, off int) ([]string, map[uint32]string, error) { 462 if off == 0 { 463 return nil, nil, nil 464 } 465 d := encoding.NewDecbufAt(bs, off, castagnoliTable) 466 467 var ( 468 origLen = d.Len() 469 cnt = d.Be32int() 470 basePos = uint32(off) + 4 471 nextPos = basePos + uint32(origLen-d.Len()) 472 symbolSlice []string 473 symbols = map[uint32]string{} 474 ) 475 if version == index.FormatV2 { 476 symbolSlice = make([]string, 0, cnt) 477 } 478 479 for d.Err() == nil && d.Len() > 0 && cnt > 0 { 480 s := d.UvarintStr() 481 482 if version == index.FormatV2 { 483 symbolSlice = append(symbolSlice, s) 484 } else { 485 symbols[nextPos] = s 486 nextPos = basePos + uint32(origLen-d.Len()) 487 } 488 cnt-- 489 } 490 return symbolSlice, symbols, errors.Wrap(d.Err(), "read symbols") 491} 492