1// Copyright 2017 The Prometheus Authors 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package tsdb 15 16import ( 17 "context" 18 "fmt" 19 "io/ioutil" 20 "math" 21 "math/rand" 22 "os" 23 "path/filepath" 24 "sort" 25 "strconv" 26 "sync" 27 "testing" 28 29 "github.com/pkg/errors" 30 prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" 31 "github.com/stretchr/testify/require" 32 33 "github.com/prometheus/prometheus/pkg/exemplar" 34 "github.com/prometheus/prometheus/pkg/labels" 35 "github.com/prometheus/prometheus/storage" 36 "github.com/prometheus/prometheus/tsdb/chunkenc" 37 "github.com/prometheus/prometheus/tsdb/chunks" 38 "github.com/prometheus/prometheus/tsdb/index" 39 "github.com/prometheus/prometheus/tsdb/record" 40 "github.com/prometheus/prometheus/tsdb/tombstones" 41 "github.com/prometheus/prometheus/tsdb/tsdbutil" 42 "github.com/prometheus/prometheus/tsdb/wal" 43) 44 45func newTestHead(t testing.TB, chunkRange int64, compressWAL bool) (*Head, *wal.WAL) { 46 dir, err := ioutil.TempDir("", "test") 47 require.NoError(t, err) 48 wlog, err := wal.NewSize(nil, nil, filepath.Join(dir, "wal"), 32768, compressWAL) 49 require.NoError(t, err) 50 51 opts := DefaultHeadOptions() 52 opts.ChunkRange = chunkRange 53 opts.ChunkDirRoot = dir 54 opts.NumExemplars = 10 55 h, err := NewHead(nil, nil, wlog, opts, nil) 56 require.NoError(t, err) 57 58 require.NoError(t, h.chunkDiskMapper.IterateAllChunks(func(_, _ uint64, _, _ int64, _ uint16) error { return nil })) 59 60 t.Cleanup(func() { 61 require.NoError(t, os.RemoveAll(dir)) 62 }) 63 return h, wlog 64} 65 66func BenchmarkCreateSeries(b *testing.B) { 67 series := genSeries(b.N, 10, 0, 0) 68 h, _ := newTestHead(b, 10000, false) 69 defer func() { 70 require.NoError(b, h.Close()) 71 }() 72 73 b.ReportAllocs() 74 b.ResetTimer() 75 76 for _, s := range series { 77 h.getOrCreate(s.Labels().Hash(), s.Labels()) 78 } 79} 80 81func populateTestWAL(t testing.TB, w *wal.WAL, recs []interface{}) { 82 var enc record.Encoder 83 for _, r := range recs { 84 switch v := r.(type) { 85 case []record.RefSeries: 86 require.NoError(t, w.Log(enc.Series(v, nil))) 87 case []record.RefSample: 88 require.NoError(t, w.Log(enc.Samples(v, nil))) 89 case []tombstones.Stone: 90 require.NoError(t, w.Log(enc.Tombstones(v, nil))) 91 case []record.RefExemplar: 92 require.NoError(t, w.Log(enc.Exemplars(v, nil))) 93 } 94 } 95} 96 97func readTestWAL(t testing.TB, dir string) (recs []interface{}) { 98 sr, err := wal.NewSegmentsReader(dir) 99 require.NoError(t, err) 100 defer sr.Close() 101 102 var dec record.Decoder 103 r := wal.NewReader(sr) 104 105 for r.Next() { 106 rec := r.Record() 107 108 switch dec.Type(rec) { 109 case record.Series: 110 series, err := dec.Series(rec, nil) 111 require.NoError(t, err) 112 recs = append(recs, series) 113 case record.Samples: 114 samples, err := dec.Samples(rec, nil) 115 require.NoError(t, err) 116 recs = append(recs, samples) 117 case record.Tombstones: 118 tstones, err := dec.Tombstones(rec, nil) 119 require.NoError(t, err) 120 recs = append(recs, tstones) 121 default: 122 t.Fatalf("unknown record type") 123 } 124 } 125 require.NoError(t, r.Err()) 126 return recs 127} 128 129func BenchmarkLoadWAL(b *testing.B) { 130 cases := []struct { 131 // Total series is (batches*seriesPerBatch). 132 batches int 133 seriesPerBatch int 134 samplesPerSeries int 135 }{ 136 { // Less series and more samples. 2 hour WAL with 1 second scrape interval. 137 batches: 10, 138 seriesPerBatch: 100, 139 samplesPerSeries: 7200, 140 }, 141 { // More series and less samples. 142 batches: 10, 143 seriesPerBatch: 10000, 144 samplesPerSeries: 50, 145 }, 146 { // In between. 147 batches: 10, 148 seriesPerBatch: 1000, 149 samplesPerSeries: 480, 150 }, 151 } 152 153 labelsPerSeries := 5 154 // Rough estimates of most common % of samples that have an exemplar for each scrape. 155 exemplarsPercentages := []float64{0, 0.5, 1, 5} 156 lastExemplarsPerSeries := -1 157 for _, c := range cases { 158 for _, p := range exemplarsPercentages { 159 exemplarsPerSeries := int(math.RoundToEven(float64(c.samplesPerSeries) * p / 100)) 160 // For tests with low samplesPerSeries we could end up testing with 0 exemplarsPerSeries 161 // multiple times without this check. 162 if exemplarsPerSeries == lastExemplarsPerSeries { 163 continue 164 } 165 lastExemplarsPerSeries = exemplarsPerSeries 166 // fmt.Println("exemplars per series: ", exemplarsPerSeries) 167 b.Run(fmt.Sprintf("batches=%d,seriesPerBatch=%d,samplesPerSeries=%d,exemplarsPerSeries=%d", c.batches, c.seriesPerBatch, c.samplesPerSeries, exemplarsPerSeries), 168 func(b *testing.B) { 169 dir, err := ioutil.TempDir("", "test_load_wal") 170 require.NoError(b, err) 171 defer func() { 172 require.NoError(b, os.RemoveAll(dir)) 173 }() 174 175 w, err := wal.New(nil, nil, dir, false) 176 require.NoError(b, err) 177 178 // Write series. 179 refSeries := make([]record.RefSeries, 0, c.seriesPerBatch) 180 for k := 0; k < c.batches; k++ { 181 refSeries = refSeries[:0] 182 for i := k * c.seriesPerBatch; i < (k+1)*c.seriesPerBatch; i++ { 183 lbls := make(map[string]string, labelsPerSeries) 184 lbls[defaultLabelName] = strconv.Itoa(i) 185 for j := 1; len(lbls) < labelsPerSeries; j++ { 186 lbls[defaultLabelName+strconv.Itoa(j)] = defaultLabelValue + strconv.Itoa(j) 187 } 188 refSeries = append(refSeries, record.RefSeries{Ref: uint64(i) * 100, Labels: labels.FromMap(lbls)}) 189 } 190 populateTestWAL(b, w, []interface{}{refSeries}) 191 } 192 193 // Write samples. 194 refSamples := make([]record.RefSample, 0, c.seriesPerBatch) 195 for i := 0; i < c.samplesPerSeries; i++ { 196 for j := 0; j < c.batches; j++ { 197 refSamples = refSamples[:0] 198 for k := j * c.seriesPerBatch; k < (j+1)*c.seriesPerBatch; k++ { 199 refSamples = append(refSamples, record.RefSample{ 200 Ref: uint64(k) * 100, 201 T: int64(i) * 10, 202 V: float64(i) * 100, 203 }) 204 } 205 populateTestWAL(b, w, []interface{}{refSamples}) 206 } 207 } 208 209 // Write samples. 210 refExemplars := make([]record.RefExemplar, 0, c.seriesPerBatch) 211 for i := 0; i < exemplarsPerSeries; i++ { 212 for j := 0; j < c.batches; j++ { 213 refExemplars = refExemplars[:0] 214 for k := j * c.seriesPerBatch; k < (j+1)*c.seriesPerBatch; k++ { 215 refExemplars = append(refExemplars, record.RefExemplar{ 216 Ref: uint64(k) * 100, 217 T: int64(i) * 10, 218 V: float64(i) * 100, 219 Labels: labels.FromStrings("traceID", fmt.Sprintf("trace-%d", i)), 220 }) 221 } 222 populateTestWAL(b, w, []interface{}{refExemplars}) 223 } 224 } 225 226 b.ResetTimer() 227 228 // Load the WAL. 229 for i := 0; i < b.N; i++ { 230 opts := DefaultHeadOptions() 231 opts.ChunkRange = 1000 232 opts.ChunkDirRoot = w.Dir() 233 h, err := NewHead(nil, nil, w, opts, nil) 234 require.NoError(b, err) 235 h.Init(0) 236 } 237 }) 238 } 239 } 240} 241 242func TestHead_ReadWAL(t *testing.T) { 243 for _, compress := range []bool{false, true} { 244 t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { 245 entries := []interface{}{ 246 []record.RefSeries{ 247 {Ref: 10, Labels: labels.FromStrings("a", "1")}, 248 {Ref: 11, Labels: labels.FromStrings("a", "2")}, 249 {Ref: 100, Labels: labels.FromStrings("a", "3")}, 250 }, 251 []record.RefSample{ 252 {Ref: 0, T: 99, V: 1}, 253 {Ref: 10, T: 100, V: 2}, 254 {Ref: 100, T: 100, V: 3}, 255 }, 256 []record.RefSeries{ 257 {Ref: 50, Labels: labels.FromStrings("a", "4")}, 258 // This series has two refs pointing to it. 259 {Ref: 101, Labels: labels.FromStrings("a", "3")}, 260 }, 261 []record.RefSample{ 262 {Ref: 10, T: 101, V: 5}, 263 {Ref: 50, T: 101, V: 6}, 264 {Ref: 101, T: 101, V: 7}, 265 }, 266 []tombstones.Stone{ 267 {Ref: 0, Intervals: []tombstones.Interval{{Mint: 99, Maxt: 101}}}, 268 }, 269 []record.RefExemplar{ 270 {Ref: 10, T: 100, V: 1, Labels: labels.FromStrings("traceID", "asdf")}, 271 }, 272 } 273 274 head, w := newTestHead(t, 1000, compress) 275 defer func() { 276 require.NoError(t, head.Close()) 277 }() 278 279 populateTestWAL(t, w, entries) 280 281 require.NoError(t, head.Init(math.MinInt64)) 282 require.Equal(t, uint64(101), head.lastSeriesID.Load()) 283 284 s10 := head.series.getByID(10) 285 s11 := head.series.getByID(11) 286 s50 := head.series.getByID(50) 287 s100 := head.series.getByID(100) 288 289 require.Equal(t, labels.FromStrings("a", "1"), s10.lset) 290 require.Equal(t, (*memSeries)(nil), s11) // Series without samples should be garbage collected at head.Init(). 291 require.Equal(t, labels.FromStrings("a", "4"), s50.lset) 292 require.Equal(t, labels.FromStrings("a", "3"), s100.lset) 293 294 expandChunk := func(c chunkenc.Iterator) (x []sample) { 295 for c.Next() { 296 t, v := c.At() 297 x = append(x, sample{t: t, v: v}) 298 } 299 require.NoError(t, c.Err()) 300 return x 301 } 302 require.Equal(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0, nil, head.chunkDiskMapper, nil))) 303 require.Equal(t, []sample{{101, 6}}, expandChunk(s50.iterator(0, nil, head.chunkDiskMapper, nil))) 304 require.Equal(t, []sample{{100, 3}, {101, 7}}, expandChunk(s100.iterator(0, nil, head.chunkDiskMapper, nil))) 305 306 q, err := head.ExemplarQuerier(context.Background()) 307 require.NoError(t, err) 308 e, err := q.Select(0, 1000, []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "1")}) 309 require.NoError(t, err) 310 require.Equal(t, e[0].Exemplars[0], exemplar.Exemplar{Ts: 100, Value: 1, Labels: labels.FromStrings("traceID", "asdf")}) 311 }) 312 } 313} 314 315func TestHead_WALMultiRef(t *testing.T) { 316 head, w := newTestHead(t, 1000, false) 317 318 require.NoError(t, head.Init(0)) 319 320 app := head.Appender(context.Background()) 321 ref1, err := app.Append(0, labels.FromStrings("foo", "bar"), 100, 1) 322 require.NoError(t, err) 323 require.NoError(t, app.Commit()) 324 require.Equal(t, 1.0, prom_testutil.ToFloat64(head.metrics.chunksCreated)) 325 326 // Add another sample outside chunk range to mmap a chunk. 327 app = head.Appender(context.Background()) 328 _, err = app.Append(0, labels.FromStrings("foo", "bar"), 1500, 2) 329 require.NoError(t, err) 330 require.NoError(t, app.Commit()) 331 require.Equal(t, 2.0, prom_testutil.ToFloat64(head.metrics.chunksCreated)) 332 333 require.NoError(t, head.Truncate(1600)) 334 335 app = head.Appender(context.Background()) 336 ref2, err := app.Append(0, labels.FromStrings("foo", "bar"), 1700, 3) 337 require.NoError(t, err) 338 require.NoError(t, app.Commit()) 339 require.Equal(t, 3.0, prom_testutil.ToFloat64(head.metrics.chunksCreated)) 340 341 // Add another sample outside chunk range to mmap a chunk. 342 app = head.Appender(context.Background()) 343 _, err = app.Append(0, labels.FromStrings("foo", "bar"), 2000, 4) 344 require.NoError(t, err) 345 require.NoError(t, app.Commit()) 346 require.Equal(t, 4.0, prom_testutil.ToFloat64(head.metrics.chunksCreated)) 347 348 require.NotEqual(t, ref1, ref2, "Refs are the same") 349 require.NoError(t, head.Close()) 350 351 w, err = wal.New(nil, nil, w.Dir(), false) 352 require.NoError(t, err) 353 354 opts := DefaultHeadOptions() 355 opts.ChunkRange = 1000 356 opts.ChunkDirRoot = w.Dir() 357 head, err = NewHead(nil, nil, w, opts, nil) 358 require.NoError(t, err) 359 require.NoError(t, head.Init(0)) 360 defer func() { 361 require.NoError(t, head.Close()) 362 }() 363 364 q, err := NewBlockQuerier(head, 0, 2100) 365 require.NoError(t, err) 366 series := query(t, q, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) 367 require.Equal(t, map[string][]tsdbutil.Sample{`{foo="bar"}`: { 368 sample{100, 1}, 369 sample{1500, 2}, 370 sample{1700, 3}, 371 sample{2000, 4}, 372 }}, series) 373} 374 375func TestHead_UnknownWALRecord(t *testing.T) { 376 head, w := newTestHead(t, 1000, false) 377 w.Log([]byte{255, 42}) 378 require.NoError(t, head.Init(0)) 379 require.NoError(t, head.Close()) 380} 381 382func TestHead_Truncate(t *testing.T) { 383 h, _ := newTestHead(t, 1000, false) 384 defer func() { 385 require.NoError(t, h.Close()) 386 }() 387 388 h.initTime(0) 389 390 s1, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1", "b", "1")) 391 s2, _, _ := h.getOrCreate(2, labels.FromStrings("a", "2", "b", "1")) 392 s3, _, _ := h.getOrCreate(3, labels.FromStrings("a", "1", "b", "2")) 393 s4, _, _ := h.getOrCreate(4, labels.FromStrings("a", "2", "b", "2", "c", "1")) 394 395 s1.mmappedChunks = []*mmappedChunk{ 396 {minTime: 0, maxTime: 999}, 397 {minTime: 1000, maxTime: 1999}, 398 {minTime: 2000, maxTime: 2999}, 399 } 400 s2.mmappedChunks = []*mmappedChunk{ 401 {minTime: 1000, maxTime: 1999}, 402 {minTime: 2000, maxTime: 2999}, 403 {minTime: 3000, maxTime: 3999}, 404 } 405 s3.mmappedChunks = []*mmappedChunk{ 406 {minTime: 0, maxTime: 999}, 407 {minTime: 1000, maxTime: 1999}, 408 } 409 s4.mmappedChunks = []*mmappedChunk{} 410 411 // Truncation need not be aligned. 412 require.NoError(t, h.Truncate(1)) 413 414 require.NoError(t, h.Truncate(2000)) 415 416 require.Equal(t, []*mmappedChunk{ 417 {minTime: 2000, maxTime: 2999}, 418 }, h.series.getByID(s1.ref).mmappedChunks) 419 420 require.Equal(t, []*mmappedChunk{ 421 {minTime: 2000, maxTime: 2999}, 422 {minTime: 3000, maxTime: 3999}, 423 }, h.series.getByID(s2.ref).mmappedChunks) 424 425 require.Nil(t, h.series.getByID(s3.ref)) 426 require.Nil(t, h.series.getByID(s4.ref)) 427 428 postingsA1, _ := index.ExpandPostings(h.postings.Get("a", "1")) 429 postingsA2, _ := index.ExpandPostings(h.postings.Get("a", "2")) 430 postingsB1, _ := index.ExpandPostings(h.postings.Get("b", "1")) 431 postingsB2, _ := index.ExpandPostings(h.postings.Get("b", "2")) 432 postingsC1, _ := index.ExpandPostings(h.postings.Get("c", "1")) 433 postingsAll, _ := index.ExpandPostings(h.postings.Get("", "")) 434 435 require.Equal(t, []uint64{s1.ref}, postingsA1) 436 require.Equal(t, []uint64{s2.ref}, postingsA2) 437 require.Equal(t, []uint64{s1.ref, s2.ref}, postingsB1) 438 require.Equal(t, []uint64{s1.ref, s2.ref}, postingsAll) 439 require.Nil(t, postingsB2) 440 require.Nil(t, postingsC1) 441 442 require.Equal(t, map[string]struct{}{ 443 "": {}, // from 'all' postings list 444 "a": {}, 445 "b": {}, 446 "1": {}, 447 "2": {}, 448 }, h.symbols) 449 450 values := map[string]map[string]struct{}{} 451 for _, name := range h.postings.LabelNames() { 452 ss, ok := values[name] 453 if !ok { 454 ss = map[string]struct{}{} 455 values[name] = ss 456 } 457 for _, value := range h.postings.LabelValues(name) { 458 ss[value] = struct{}{} 459 } 460 } 461 require.Equal(t, map[string]map[string]struct{}{ 462 "a": {"1": struct{}{}, "2": struct{}{}}, 463 "b": {"1": struct{}{}}, 464 }, values) 465} 466 467// Validate various behaviors brought on by firstChunkID accounting for 468// garbage collected chunks. 469func TestMemSeries_truncateChunks(t *testing.T) { 470 dir, err := ioutil.TempDir("", "truncate_chunks") 471 require.NoError(t, err) 472 defer func() { 473 require.NoError(t, os.RemoveAll(dir)) 474 }() 475 // This is usually taken from the Head, but passing manually here. 476 chunkDiskMapper, err := chunks.NewChunkDiskMapper(dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize) 477 require.NoError(t, err) 478 defer func() { 479 require.NoError(t, chunkDiskMapper.Close()) 480 }() 481 482 memChunkPool := sync.Pool{ 483 New: func() interface{} { 484 return &memChunk{} 485 }, 486 } 487 488 s := newMemSeries(labels.FromStrings("a", "b"), 1, 2000, &memChunkPool) 489 490 for i := 0; i < 4000; i += 5 { 491 ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper) 492 require.True(t, ok, "sample append failed") 493 } 494 495 // Check that truncate removes half of the chunks and afterwards 496 // that the ID of the last chunk still gives us the same chunk afterwards. 497 countBefore := len(s.mmappedChunks) + 1 // +1 for the head chunk. 498 lastID := s.chunkID(countBefore - 1) 499 lastChunk, _, err := s.chunk(lastID, chunkDiskMapper) 500 require.NoError(t, err) 501 require.NotNil(t, lastChunk) 502 503 chk, _, err := s.chunk(0, chunkDiskMapper) 504 require.NotNil(t, chk) 505 require.NoError(t, err) 506 507 s.truncateChunksBefore(2000) 508 509 require.Equal(t, int64(2000), s.mmappedChunks[0].minTime) 510 _, _, err = s.chunk(0, chunkDiskMapper) 511 require.Equal(t, storage.ErrNotFound, err, "first chunks not gone") 512 require.Equal(t, countBefore/2, len(s.mmappedChunks)+1) // +1 for the head chunk. 513 chk, _, err = s.chunk(lastID, chunkDiskMapper) 514 require.NoError(t, err) 515 require.Equal(t, lastChunk, chk) 516 517 // Validate that the series' sample buffer is applied correctly to the last chunk 518 // after truncation. 519 it1 := s.iterator(s.chunkID(len(s.mmappedChunks)), nil, chunkDiskMapper, nil) 520 _, ok := it1.(*memSafeIterator) 521 require.True(t, ok) 522 523 it2 := s.iterator(s.chunkID(len(s.mmappedChunks)-1), nil, chunkDiskMapper, nil) 524 _, ok = it2.(*memSafeIterator) 525 require.False(t, ok, "non-last chunk incorrectly wrapped with sample buffer") 526} 527 528func TestHeadDeleteSeriesWithoutSamples(t *testing.T) { 529 for _, compress := range []bool{false, true} { 530 t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { 531 entries := []interface{}{ 532 []record.RefSeries{ 533 {Ref: 10, Labels: labels.FromStrings("a", "1")}, 534 }, 535 []record.RefSample{}, 536 []record.RefSeries{ 537 {Ref: 50, Labels: labels.FromStrings("a", "2")}, 538 }, 539 []record.RefSample{ 540 {Ref: 50, T: 80, V: 1}, 541 {Ref: 50, T: 90, V: 1}, 542 }, 543 } 544 head, w := newTestHead(t, 1000, compress) 545 defer func() { 546 require.NoError(t, head.Close()) 547 }() 548 549 populateTestWAL(t, w, entries) 550 551 require.NoError(t, head.Init(math.MinInt64)) 552 553 require.NoError(t, head.Delete(0, 100, labels.MustNewMatcher(labels.MatchEqual, "a", "1"))) 554 }) 555 } 556} 557 558func TestHeadDeleteSimple(t *testing.T) { 559 buildSmpls := func(s []int64) []sample { 560 ss := make([]sample, 0, len(s)) 561 for _, t := range s { 562 ss = append(ss, sample{t: t, v: float64(t)}) 563 } 564 return ss 565 } 566 smplsAll := buildSmpls([]int64{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}) 567 lblDefault := labels.Label{Name: "a", Value: "b"} 568 569 cases := []struct { 570 dranges tombstones.Intervals 571 addSamples []sample // Samples to add after delete. 572 smplsExp []sample 573 }{ 574 { 575 dranges: tombstones.Intervals{{Mint: 0, Maxt: 3}}, 576 smplsExp: buildSmpls([]int64{4, 5, 6, 7, 8, 9}), 577 }, 578 { 579 dranges: tombstones.Intervals{{Mint: 1, Maxt: 3}}, 580 smplsExp: buildSmpls([]int64{0, 4, 5, 6, 7, 8, 9}), 581 }, 582 { 583 dranges: tombstones.Intervals{{Mint: 1, Maxt: 3}, {Mint: 4, Maxt: 7}}, 584 smplsExp: buildSmpls([]int64{0, 8, 9}), 585 }, 586 { 587 dranges: tombstones.Intervals{{Mint: 1, Maxt: 3}, {Mint: 4, Maxt: 700}}, 588 smplsExp: buildSmpls([]int64{0}), 589 }, 590 { // This case is to ensure that labels and symbols are deleted. 591 dranges: tombstones.Intervals{{Mint: 0, Maxt: 9}}, 592 smplsExp: buildSmpls([]int64{}), 593 }, 594 { 595 dranges: tombstones.Intervals{{Mint: 1, Maxt: 3}}, 596 addSamples: buildSmpls([]int64{11, 13, 15}), 597 smplsExp: buildSmpls([]int64{0, 4, 5, 6, 7, 8, 9, 11, 13, 15}), 598 }, 599 { 600 // After delete, the appended samples in the deleted range should be visible 601 // as the tombstones are clamped to head min/max time. 602 dranges: tombstones.Intervals{{Mint: 7, Maxt: 20}}, 603 addSamples: buildSmpls([]int64{11, 13, 15}), 604 smplsExp: buildSmpls([]int64{0, 1, 2, 3, 4, 5, 6, 11, 13, 15}), 605 }, 606 } 607 608 for _, compress := range []bool{false, true} { 609 t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { 610 for _, c := range cases { 611 head, w := newTestHead(t, 1000, compress) 612 613 app := head.Appender(context.Background()) 614 for _, smpl := range smplsAll { 615 _, err := app.Append(0, labels.Labels{lblDefault}, smpl.t, smpl.v) 616 require.NoError(t, err) 617 618 } 619 require.NoError(t, app.Commit()) 620 621 // Delete the ranges. 622 for _, r := range c.dranges { 623 require.NoError(t, head.Delete(r.Mint, r.Maxt, labels.MustNewMatcher(labels.MatchEqual, lblDefault.Name, lblDefault.Value))) 624 } 625 626 // Add more samples. 627 app = head.Appender(context.Background()) 628 for _, smpl := range c.addSamples { 629 _, err := app.Append(0, labels.Labels{lblDefault}, smpl.t, smpl.v) 630 require.NoError(t, err) 631 632 } 633 require.NoError(t, app.Commit()) 634 635 // Compare the samples for both heads - before and after the reloadBlocks. 636 reloadedW, err := wal.New(nil, nil, w.Dir(), compress) // Use a new wal to ensure deleted samples are gone even after a reloadBlocks. 637 require.NoError(t, err) 638 opts := DefaultHeadOptions() 639 opts.ChunkRange = 1000 640 opts.ChunkDirRoot = reloadedW.Dir() 641 reloadedHead, err := NewHead(nil, nil, reloadedW, opts, nil) 642 require.NoError(t, err) 643 require.NoError(t, reloadedHead.Init(0)) 644 645 // Compare the query results for both heads - before and after the reloadBlocks. 646 Outer: 647 for _, h := range []*Head{head, reloadedHead} { 648 q, err := NewBlockQuerier(h, h.MinTime(), h.MaxTime()) 649 require.NoError(t, err) 650 actSeriesSet := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, lblDefault.Name, lblDefault.Value)) 651 require.NoError(t, q.Close()) 652 expSeriesSet := newMockSeriesSet([]storage.Series{ 653 storage.NewListSeries(labels.Labels{lblDefault}, func() []tsdbutil.Sample { 654 ss := make([]tsdbutil.Sample, 0, len(c.smplsExp)) 655 for _, s := range c.smplsExp { 656 ss = append(ss, s) 657 } 658 return ss 659 }(), 660 ), 661 }) 662 663 for { 664 eok, rok := expSeriesSet.Next(), actSeriesSet.Next() 665 require.Equal(t, eok, rok) 666 667 if !eok { 668 require.NoError(t, h.Close()) 669 require.NoError(t, actSeriesSet.Err()) 670 require.Equal(t, 0, len(actSeriesSet.Warnings())) 671 continue Outer 672 } 673 expSeries := expSeriesSet.At() 674 actSeries := actSeriesSet.At() 675 676 require.Equal(t, expSeries.Labels(), actSeries.Labels()) 677 678 smplExp, errExp := storage.ExpandSamples(expSeries.Iterator(), nil) 679 smplRes, errRes := storage.ExpandSamples(actSeries.Iterator(), nil) 680 681 require.Equal(t, errExp, errRes) 682 require.Equal(t, smplExp, smplRes) 683 } 684 } 685 } 686 }) 687 } 688} 689 690func TestDeleteUntilCurMax(t *testing.T) { 691 hb, _ := newTestHead(t, 1000000, false) 692 defer func() { 693 require.NoError(t, hb.Close()) 694 }() 695 696 numSamples := int64(10) 697 app := hb.Appender(context.Background()) 698 smpls := make([]float64, numSamples) 699 for i := int64(0); i < numSamples; i++ { 700 smpls[i] = rand.Float64() 701 _, err := app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, i, smpls[i]) 702 require.NoError(t, err) 703 } 704 require.NoError(t, app.Commit()) 705 require.NoError(t, hb.Delete(0, 10000, labels.MustNewMatcher(labels.MatchEqual, "a", "b"))) 706 707 // Test the series returns no samples. The series is cleared only after compaction. 708 q, err := NewBlockQuerier(hb, 0, 100000) 709 require.NoError(t, err) 710 res := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) 711 require.True(t, res.Next(), "series is not present") 712 s := res.At() 713 it := s.Iterator() 714 require.False(t, it.Next(), "expected no samples") 715 for res.Next() { 716 } 717 require.NoError(t, res.Err()) 718 require.Equal(t, 0, len(res.Warnings())) 719 720 // Add again and test for presence. 721 app = hb.Appender(context.Background()) 722 _, err = app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 11, 1) 723 require.NoError(t, err) 724 require.NoError(t, app.Commit()) 725 q, err = NewBlockQuerier(hb, 0, 100000) 726 require.NoError(t, err) 727 res = q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "b")) 728 require.True(t, res.Next(), "series don't exist") 729 exps := res.At() 730 it = exps.Iterator() 731 resSamples, err := storage.ExpandSamples(it, newSample) 732 require.NoError(t, err) 733 require.Equal(t, []tsdbutil.Sample{sample{11, 1}}, resSamples) 734 for res.Next() { 735 } 736 require.NoError(t, res.Err()) 737 require.Equal(t, 0, len(res.Warnings())) 738} 739 740func TestDeletedSamplesAndSeriesStillInWALAfterCheckpoint(t *testing.T) { 741 numSamples := 10000 742 743 // Enough samples to cause a checkpoint. 744 hb, w := newTestHead(t, int64(numSamples)*10, false) 745 746 for i := 0; i < numSamples; i++ { 747 app := hb.Appender(context.Background()) 748 _, err := app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, int64(i), 0) 749 require.NoError(t, err) 750 require.NoError(t, app.Commit()) 751 } 752 require.NoError(t, hb.Delete(0, int64(numSamples), labels.MustNewMatcher(labels.MatchEqual, "a", "b"))) 753 require.NoError(t, hb.Truncate(1)) 754 require.NoError(t, hb.Close()) 755 756 // Confirm there's been a checkpoint. 757 cdir, _, err := wal.LastCheckpoint(w.Dir()) 758 require.NoError(t, err) 759 // Read in checkpoint and WAL. 760 recs := readTestWAL(t, cdir) 761 recs = append(recs, readTestWAL(t, w.Dir())...) 762 763 var series, samples, stones int 764 for _, rec := range recs { 765 switch rec.(type) { 766 case []record.RefSeries: 767 series++ 768 case []record.RefSample: 769 samples++ 770 case []tombstones.Stone: 771 stones++ 772 default: 773 t.Fatalf("unknown record type") 774 } 775 } 776 require.Equal(t, 1, series) 777 require.Equal(t, 9999, samples) 778 require.Equal(t, 1, stones) 779 780} 781 782func TestDelete_e2e(t *testing.T) { 783 numDatapoints := 1000 784 numRanges := 1000 785 timeInterval := int64(2) 786 // Create 8 series with 1000 data-points of different ranges, delete and run queries. 787 lbls := [][]labels.Label{ 788 { 789 {Name: "a", Value: "b"}, 790 {Name: "instance", Value: "localhost:9090"}, 791 {Name: "job", Value: "prometheus"}, 792 }, 793 { 794 {Name: "a", Value: "b"}, 795 {Name: "instance", Value: "127.0.0.1:9090"}, 796 {Name: "job", Value: "prometheus"}, 797 }, 798 { 799 {Name: "a", Value: "b"}, 800 {Name: "instance", Value: "127.0.0.1:9090"}, 801 {Name: "job", Value: "prom-k8s"}, 802 }, 803 { 804 {Name: "a", Value: "b"}, 805 {Name: "instance", Value: "localhost:9090"}, 806 {Name: "job", Value: "prom-k8s"}, 807 }, 808 { 809 {Name: "a", Value: "c"}, 810 {Name: "instance", Value: "localhost:9090"}, 811 {Name: "job", Value: "prometheus"}, 812 }, 813 { 814 {Name: "a", Value: "c"}, 815 {Name: "instance", Value: "127.0.0.1:9090"}, 816 {Name: "job", Value: "prometheus"}, 817 }, 818 { 819 {Name: "a", Value: "c"}, 820 {Name: "instance", Value: "127.0.0.1:9090"}, 821 {Name: "job", Value: "prom-k8s"}, 822 }, 823 { 824 {Name: "a", Value: "c"}, 825 {Name: "instance", Value: "localhost:9090"}, 826 {Name: "job", Value: "prom-k8s"}, 827 }, 828 } 829 seriesMap := map[string][]tsdbutil.Sample{} 830 for _, l := range lbls { 831 seriesMap[labels.New(l...).String()] = []tsdbutil.Sample{} 832 } 833 834 hb, _ := newTestHead(t, 100000, false) 835 defer func() { 836 require.NoError(t, hb.Close()) 837 }() 838 839 app := hb.Appender(context.Background()) 840 for _, l := range lbls { 841 ls := labels.New(l...) 842 series := []tsdbutil.Sample{} 843 ts := rand.Int63n(300) 844 for i := 0; i < numDatapoints; i++ { 845 v := rand.Float64() 846 _, err := app.Append(0, ls, ts, v) 847 require.NoError(t, err) 848 series = append(series, sample{ts, v}) 849 ts += rand.Int63n(timeInterval) + 1 850 } 851 seriesMap[labels.New(l...).String()] = series 852 } 853 require.NoError(t, app.Commit()) 854 // Delete a time-range from each-selector. 855 dels := []struct { 856 ms []*labels.Matcher 857 drange tombstones.Intervals 858 }{ 859 { 860 ms: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "a", "b")}, 861 drange: tombstones.Intervals{{Mint: 300, Maxt: 500}, {Mint: 600, Maxt: 670}}, 862 }, 863 { 864 ms: []*labels.Matcher{ 865 labels.MustNewMatcher(labels.MatchEqual, "a", "b"), 866 labels.MustNewMatcher(labels.MatchEqual, "job", "prom-k8s"), 867 }, 868 drange: tombstones.Intervals{{Mint: 300, Maxt: 500}, {Mint: 100, Maxt: 670}}, 869 }, 870 { 871 ms: []*labels.Matcher{ 872 labels.MustNewMatcher(labels.MatchEqual, "a", "c"), 873 labels.MustNewMatcher(labels.MatchEqual, "instance", "localhost:9090"), 874 labels.MustNewMatcher(labels.MatchEqual, "job", "prometheus"), 875 }, 876 drange: tombstones.Intervals{{Mint: 300, Maxt: 400}, {Mint: 100, Maxt: 6700}}, 877 }, 878 // TODO: Add Regexp Matchers. 879 } 880 for _, del := range dels { 881 for _, r := range del.drange { 882 require.NoError(t, hb.Delete(r.Mint, r.Maxt, del.ms...)) 883 } 884 matched := labels.Slice{} 885 for _, ls := range lbls { 886 s := labels.Selector(del.ms) 887 if s.Matches(ls) { 888 matched = append(matched, ls) 889 } 890 } 891 sort.Sort(matched) 892 for i := 0; i < numRanges; i++ { 893 q, err := NewBlockQuerier(hb, 0, 100000) 894 require.NoError(t, err) 895 defer q.Close() 896 ss := q.Select(true, nil, del.ms...) 897 // Build the mockSeriesSet. 898 matchedSeries := make([]storage.Series, 0, len(matched)) 899 for _, m := range matched { 900 smpls := seriesMap[m.String()] 901 smpls = deletedSamples(smpls, del.drange) 902 // Only append those series for which samples exist as mockSeriesSet 903 // doesn't skip series with no samples. 904 // TODO: But sometimes SeriesSet returns an empty chunkenc.Iterator 905 if len(smpls) > 0 { 906 matchedSeries = append(matchedSeries, storage.NewListSeries(m, smpls)) 907 } 908 } 909 expSs := newMockSeriesSet(matchedSeries) 910 // Compare both SeriesSets. 911 for { 912 eok, rok := expSs.Next(), ss.Next() 913 // Skip a series if iterator is empty. 914 if rok { 915 for !ss.At().Iterator().Next() { 916 rok = ss.Next() 917 if !rok { 918 break 919 } 920 } 921 } 922 require.Equal(t, eok, rok) 923 if !eok { 924 break 925 } 926 sexp := expSs.At() 927 sres := ss.At() 928 require.Equal(t, sexp.Labels(), sres.Labels()) 929 smplExp, errExp := storage.ExpandSamples(sexp.Iterator(), nil) 930 smplRes, errRes := storage.ExpandSamples(sres.Iterator(), nil) 931 require.Equal(t, errExp, errRes) 932 require.Equal(t, smplExp, smplRes) 933 } 934 require.NoError(t, ss.Err()) 935 require.Equal(t, 0, len(ss.Warnings())) 936 } 937 } 938} 939 940func boundedSamples(full []tsdbutil.Sample, mint, maxt int64) []tsdbutil.Sample { 941 for len(full) > 0 { 942 if full[0].T() >= mint { 943 break 944 } 945 full = full[1:] 946 } 947 for i, s := range full { 948 // labels.Labelinate on the first sample larger than maxt. 949 if s.T() > maxt { 950 return full[:i] 951 } 952 } 953 // maxt is after highest sample. 954 return full 955} 956 957func deletedSamples(full []tsdbutil.Sample, dranges tombstones.Intervals) []tsdbutil.Sample { 958 ds := make([]tsdbutil.Sample, 0, len(full)) 959Outer: 960 for _, s := range full { 961 for _, r := range dranges { 962 if r.InBounds(s.T()) { 963 continue Outer 964 } 965 } 966 ds = append(ds, s) 967 } 968 969 return ds 970} 971 972func TestComputeChunkEndTime(t *testing.T) { 973 cases := []struct { 974 start, cur, max int64 975 res int64 976 }{ 977 { 978 start: 0, 979 cur: 250, 980 max: 1000, 981 res: 1000, 982 }, 983 { 984 start: 100, 985 cur: 200, 986 max: 1000, 987 res: 550, 988 }, 989 // Case where we fit floored 0 chunks. Must catch division by 0 990 // and default to maximum time. 991 { 992 start: 0, 993 cur: 500, 994 max: 1000, 995 res: 1000, 996 }, 997 // Catch division by zero for cur == start. Strictly not a possible case. 998 { 999 start: 100, 1000 cur: 100, 1001 max: 1000, 1002 res: 104, 1003 }, 1004 } 1005 1006 for _, c := range cases { 1007 got := computeChunkEndTime(c.start, c.cur, c.max) 1008 if got != c.res { 1009 t.Errorf("expected %d for (start: %d, cur: %d, max: %d), got %d", c.res, c.start, c.cur, c.max, got) 1010 } 1011 } 1012} 1013 1014func TestMemSeries_append(t *testing.T) { 1015 dir, err := ioutil.TempDir("", "append") 1016 require.NoError(t, err) 1017 defer func() { 1018 require.NoError(t, os.RemoveAll(dir)) 1019 }() 1020 // This is usually taken from the Head, but passing manually here. 1021 chunkDiskMapper, err := chunks.NewChunkDiskMapper(dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize) 1022 require.NoError(t, err) 1023 defer func() { 1024 require.NoError(t, chunkDiskMapper.Close()) 1025 }() 1026 1027 s := newMemSeries(labels.Labels{}, 1, 500, nil) 1028 1029 // Add first two samples at the very end of a chunk range and the next two 1030 // on and after it. 1031 // New chunk must correctly be cut at 1000. 1032 ok, chunkCreated := s.append(998, 1, 0, chunkDiskMapper) 1033 require.True(t, ok, "append failed") 1034 require.True(t, chunkCreated, "first sample created chunk") 1035 1036 ok, chunkCreated = s.append(999, 2, 0, chunkDiskMapper) 1037 require.True(t, ok, "append failed") 1038 require.False(t, chunkCreated, "second sample should use same chunk") 1039 1040 ok, chunkCreated = s.append(1000, 3, 0, chunkDiskMapper) 1041 require.True(t, ok, "append failed") 1042 require.True(t, chunkCreated, "expected new chunk on boundary") 1043 1044 ok, chunkCreated = s.append(1001, 4, 0, chunkDiskMapper) 1045 require.True(t, ok, "append failed") 1046 require.False(t, chunkCreated, "second sample should use same chunk") 1047 1048 require.Equal(t, 1, len(s.mmappedChunks), "there should be only 1 mmapped chunk") 1049 require.Equal(t, int64(998), s.mmappedChunks[0].minTime, "wrong chunk range") 1050 require.Equal(t, int64(999), s.mmappedChunks[0].maxTime, "wrong chunk range") 1051 require.Equal(t, int64(1000), s.headChunk.minTime, "wrong chunk range") 1052 require.Equal(t, int64(1001), s.headChunk.maxTime, "wrong chunk range") 1053 1054 // Fill the range [1000,2000) with many samples. Intermediate chunks should be cut 1055 // at approximately 120 samples per chunk. 1056 for i := 1; i < 1000; i++ { 1057 ok, _ := s.append(1001+int64(i), float64(i), 0, chunkDiskMapper) 1058 require.True(t, ok, "append failed") 1059 } 1060 1061 require.Greater(t, len(s.mmappedChunks)+1, 7, "expected intermediate chunks") 1062 1063 // All chunks but the first and last should now be moderately full. 1064 for i, c := range s.mmappedChunks[1:] { 1065 chk, err := chunkDiskMapper.Chunk(c.ref) 1066 require.NoError(t, err) 1067 require.Greater(t, chk.NumSamples(), 100, "unexpected small chunk %d of length %d", i, chk.NumSamples()) 1068 } 1069} 1070 1071func TestGCChunkAccess(t *testing.T) { 1072 // Put a chunk, select it. GC it and then access it. 1073 h, _ := newTestHead(t, 1000, false) 1074 defer func() { 1075 require.NoError(t, h.Close()) 1076 }() 1077 1078 h.initTime(0) 1079 1080 s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) 1081 1082 // Appending 2 samples for the first chunk. 1083 ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper) 1084 require.True(t, ok, "series append failed") 1085 require.True(t, chunkCreated, "chunks was not created") 1086 ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper) 1087 require.True(t, ok, "series append failed") 1088 require.False(t, chunkCreated, "chunks was created") 1089 1090 // A new chunks should be created here as it's beyond the chunk range. 1091 ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper) 1092 require.True(t, ok, "series append failed") 1093 require.True(t, chunkCreated, "chunks was not created") 1094 ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper) 1095 require.True(t, ok, "series append failed") 1096 require.False(t, chunkCreated, "chunks was created") 1097 1098 idx := h.indexRange(0, 1500) 1099 var ( 1100 lset labels.Labels 1101 chunks []chunks.Meta 1102 ) 1103 require.NoError(t, idx.Series(1, &lset, &chunks)) 1104 1105 require.Equal(t, labels.Labels{{ 1106 Name: "a", Value: "1", 1107 }}, lset) 1108 require.Equal(t, 2, len(chunks)) 1109 1110 cr, err := h.chunksRange(0, 1500, nil) 1111 require.NoError(t, err) 1112 _, err = cr.Chunk(chunks[0].Ref) 1113 require.NoError(t, err) 1114 _, err = cr.Chunk(chunks[1].Ref) 1115 require.NoError(t, err) 1116 1117 require.NoError(t, h.Truncate(1500)) // Remove a chunk. 1118 1119 _, err = cr.Chunk(chunks[0].Ref) 1120 require.Equal(t, storage.ErrNotFound, err) 1121 _, err = cr.Chunk(chunks[1].Ref) 1122 require.NoError(t, err) 1123} 1124 1125func TestGCSeriesAccess(t *testing.T) { 1126 // Put a series, select it. GC it and then access it. 1127 h, _ := newTestHead(t, 1000, false) 1128 defer func() { 1129 require.NoError(t, h.Close()) 1130 }() 1131 1132 h.initTime(0) 1133 1134 s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) 1135 1136 // Appending 2 samples for the first chunk. 1137 ok, chunkCreated := s.append(0, 0, 0, h.chunkDiskMapper) 1138 require.True(t, ok, "series append failed") 1139 require.True(t, chunkCreated, "chunks was not created") 1140 ok, chunkCreated = s.append(999, 999, 0, h.chunkDiskMapper) 1141 require.True(t, ok, "series append failed") 1142 require.False(t, chunkCreated, "chunks was created") 1143 1144 // A new chunks should be created here as it's beyond the chunk range. 1145 ok, chunkCreated = s.append(1000, 1000, 0, h.chunkDiskMapper) 1146 require.True(t, ok, "series append failed") 1147 require.True(t, chunkCreated, "chunks was not created") 1148 ok, chunkCreated = s.append(1999, 1999, 0, h.chunkDiskMapper) 1149 require.True(t, ok, "series append failed") 1150 require.False(t, chunkCreated, "chunks was created") 1151 1152 idx := h.indexRange(0, 2000) 1153 var ( 1154 lset labels.Labels 1155 chunks []chunks.Meta 1156 ) 1157 require.NoError(t, idx.Series(1, &lset, &chunks)) 1158 1159 require.Equal(t, labels.Labels{{ 1160 Name: "a", Value: "1", 1161 }}, lset) 1162 require.Equal(t, 2, len(chunks)) 1163 1164 cr, err := h.chunksRange(0, 2000, nil) 1165 require.NoError(t, err) 1166 _, err = cr.Chunk(chunks[0].Ref) 1167 require.NoError(t, err) 1168 _, err = cr.Chunk(chunks[1].Ref) 1169 require.NoError(t, err) 1170 1171 require.NoError(t, h.Truncate(2000)) // Remove the series. 1172 1173 require.Equal(t, (*memSeries)(nil), h.series.getByID(1)) 1174 1175 _, err = cr.Chunk(chunks[0].Ref) 1176 require.Equal(t, storage.ErrNotFound, err) 1177 _, err = cr.Chunk(chunks[1].Ref) 1178 require.Equal(t, storage.ErrNotFound, err) 1179} 1180 1181func TestUncommittedSamplesNotLostOnTruncate(t *testing.T) { 1182 h, _ := newTestHead(t, 1000, false) 1183 defer func() { 1184 require.NoError(t, h.Close()) 1185 }() 1186 1187 h.initTime(0) 1188 1189 app := h.appender() 1190 lset := labels.FromStrings("a", "1") 1191 _, err := app.Append(0, lset, 2100, 1) 1192 require.NoError(t, err) 1193 1194 require.NoError(t, h.Truncate(2000)) 1195 require.NotNil(t, h.series.getByHash(lset.Hash(), lset), "series should not have been garbage collected") 1196 1197 require.NoError(t, app.Commit()) 1198 1199 q, err := NewBlockQuerier(h, 1500, 2500) 1200 require.NoError(t, err) 1201 defer q.Close() 1202 1203 ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1")) 1204 require.Equal(t, true, ss.Next()) 1205 for ss.Next() { 1206 } 1207 require.NoError(t, ss.Err()) 1208 require.Equal(t, 0, len(ss.Warnings())) 1209} 1210 1211func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) { 1212 h, _ := newTestHead(t, 1000, false) 1213 defer func() { 1214 require.NoError(t, h.Close()) 1215 }() 1216 1217 h.initTime(0) 1218 1219 app := h.appender() 1220 lset := labels.FromStrings("a", "1") 1221 _, err := app.Append(0, lset, 2100, 1) 1222 require.NoError(t, err) 1223 1224 require.NoError(t, h.Truncate(2000)) 1225 require.NotNil(t, h.series.getByHash(lset.Hash(), lset), "series should not have been garbage collected") 1226 1227 require.NoError(t, app.Rollback()) 1228 1229 q, err := NewBlockQuerier(h, 1500, 2500) 1230 require.NoError(t, err) 1231 defer q.Close() 1232 1233 ss := q.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "a", "1")) 1234 require.Equal(t, false, ss.Next()) 1235 require.Equal(t, 0, len(ss.Warnings())) 1236 1237 // Truncate again, this time the series should be deleted 1238 require.NoError(t, h.Truncate(2050)) 1239 require.Equal(t, (*memSeries)(nil), h.series.getByHash(lset.Hash(), lset)) 1240} 1241 1242func TestHead_LogRollback(t *testing.T) { 1243 for _, compress := range []bool{false, true} { 1244 t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { 1245 h, w := newTestHead(t, 1000, compress) 1246 defer func() { 1247 require.NoError(t, h.Close()) 1248 }() 1249 1250 app := h.Appender(context.Background()) 1251 _, err := app.Append(0, labels.FromStrings("a", "b"), 1, 2) 1252 require.NoError(t, err) 1253 1254 require.NoError(t, app.Rollback()) 1255 recs := readTestWAL(t, w.Dir()) 1256 1257 require.Equal(t, 1, len(recs)) 1258 1259 series, ok := recs[0].([]record.RefSeries) 1260 require.True(t, ok, "expected series record but got %+v", recs[0]) 1261 require.Equal(t, []record.RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, series) 1262 }) 1263 } 1264} 1265 1266// TestWalRepair_DecodingError ensures that a repair is run for an error 1267// when decoding a record. 1268func TestWalRepair_DecodingError(t *testing.T) { 1269 var enc record.Encoder 1270 for name, test := range map[string]struct { 1271 corrFunc func(rec []byte) []byte // Func that applies the corruption to a record. 1272 rec []byte 1273 totalRecs int 1274 expRecs int 1275 }{ 1276 "decode_series": { 1277 func(rec []byte) []byte { 1278 return rec[:3] 1279 }, 1280 enc.Series([]record.RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, []byte{}), 1281 9, 1282 5, 1283 }, 1284 "decode_samples": { 1285 func(rec []byte) []byte { 1286 return rec[:3] 1287 }, 1288 enc.Samples([]record.RefSample{{Ref: 0, T: 99, V: 1}}, []byte{}), 1289 9, 1290 5, 1291 }, 1292 "decode_tombstone": { 1293 func(rec []byte) []byte { 1294 return rec[:3] 1295 }, 1296 enc.Tombstones([]tombstones.Stone{{Ref: 1, Intervals: tombstones.Intervals{}}}, []byte{}), 1297 9, 1298 5, 1299 }, 1300 } { 1301 for _, compress := range []bool{false, true} { 1302 t.Run(fmt.Sprintf("%s,compress=%t", name, compress), func(t *testing.T) { 1303 dir, err := ioutil.TempDir("", "wal_repair") 1304 require.NoError(t, err) 1305 defer func() { 1306 require.NoError(t, os.RemoveAll(dir)) 1307 }() 1308 1309 // Fill the wal and corrupt it. 1310 { 1311 w, err := wal.New(nil, nil, filepath.Join(dir, "wal"), compress) 1312 require.NoError(t, err) 1313 1314 for i := 1; i <= test.totalRecs; i++ { 1315 // At this point insert a corrupted record. 1316 if i-1 == test.expRecs { 1317 require.NoError(t, w.Log(test.corrFunc(test.rec))) 1318 continue 1319 } 1320 require.NoError(t, w.Log(test.rec)) 1321 } 1322 1323 opts := DefaultHeadOptions() 1324 opts.ChunkRange = 1 1325 opts.ChunkDirRoot = w.Dir() 1326 h, err := NewHead(nil, nil, w, opts, nil) 1327 require.NoError(t, err) 1328 require.Equal(t, 0.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal)) 1329 initErr := h.Init(math.MinInt64) 1330 1331 err = errors.Cause(initErr) // So that we can pick up errors even if wrapped. 1332 _, corrErr := err.(*wal.CorruptionErr) 1333 require.True(t, corrErr, "reading the wal didn't return corruption error") 1334 require.NoError(t, w.Close()) 1335 } 1336 1337 // Open the db to trigger a repair. 1338 { 1339 db, err := Open(dir, nil, nil, DefaultOptions(), nil) 1340 require.NoError(t, err) 1341 defer func() { 1342 require.NoError(t, db.Close()) 1343 }() 1344 require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.walCorruptionsTotal)) 1345 } 1346 1347 // Read the wal content after the repair. 1348 { 1349 sr, err := wal.NewSegmentsReader(filepath.Join(dir, "wal")) 1350 require.NoError(t, err) 1351 defer sr.Close() 1352 r := wal.NewReader(sr) 1353 1354 var actRec int 1355 for r.Next() { 1356 actRec++ 1357 } 1358 require.NoError(t, r.Err()) 1359 require.Equal(t, test.expRecs, actRec, "Wrong number of intact records") 1360 } 1361 }) 1362 } 1363 } 1364} 1365 1366func TestHeadReadWriterRepair(t *testing.T) { 1367 dir, err := ioutil.TempDir("", "head_read_writer_repair") 1368 require.NoError(t, err) 1369 defer func() { 1370 require.NoError(t, os.RemoveAll(dir)) 1371 }() 1372 1373 const chunkRange = 1000 1374 1375 walDir := filepath.Join(dir, "wal") 1376 // Fill the chunk segments and corrupt it. 1377 { 1378 w, err := wal.New(nil, nil, walDir, false) 1379 require.NoError(t, err) 1380 1381 opts := DefaultHeadOptions() 1382 opts.ChunkRange = chunkRange 1383 opts.ChunkDirRoot = dir 1384 h, err := NewHead(nil, nil, w, opts, nil) 1385 require.NoError(t, err) 1386 require.Equal(t, 0.0, prom_testutil.ToFloat64(h.metrics.mmapChunkCorruptionTotal)) 1387 require.NoError(t, h.Init(math.MinInt64)) 1388 1389 s, created, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) 1390 require.True(t, created, "series was not created") 1391 1392 for i := 0; i < 7; i++ { 1393 ok, chunkCreated := s.append(int64(i*chunkRange), float64(i*chunkRange), 0, h.chunkDiskMapper) 1394 require.True(t, ok, "series append failed") 1395 require.True(t, chunkCreated, "chunk was not created") 1396 ok, chunkCreated = s.append(int64(i*chunkRange)+chunkRange-1, float64(i*chunkRange), 0, h.chunkDiskMapper) 1397 require.True(t, ok, "series append failed") 1398 require.False(t, chunkCreated, "chunk was created") 1399 require.NoError(t, h.chunkDiskMapper.CutNewFile()) 1400 } 1401 require.NoError(t, h.Close()) 1402 1403 // Verify that there are 7 segment files. 1404 files, err := ioutil.ReadDir(mmappedChunksDir(dir)) 1405 require.NoError(t, err) 1406 require.Equal(t, 7, len(files)) 1407 1408 // Corrupt the 4th file by writing a random byte to series ref. 1409 f, err := os.OpenFile(filepath.Join(mmappedChunksDir(dir), files[3].Name()), os.O_WRONLY, 0666) 1410 require.NoError(t, err) 1411 n, err := f.WriteAt([]byte{67, 88}, chunks.HeadChunkFileHeaderSize+2) 1412 require.NoError(t, err) 1413 require.Equal(t, 2, n) 1414 require.NoError(t, f.Close()) 1415 } 1416 1417 // Open the db to trigger a repair. 1418 { 1419 db, err := Open(dir, nil, nil, DefaultOptions(), nil) 1420 require.NoError(t, err) 1421 defer func() { 1422 require.NoError(t, db.Close()) 1423 }() 1424 require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.mmapChunkCorruptionTotal)) 1425 } 1426 1427 // Verify that there are 3 segment files after the repair. 1428 // The segments from the corrupt segment should be removed. 1429 { 1430 files, err := ioutil.ReadDir(mmappedChunksDir(dir)) 1431 require.NoError(t, err) 1432 require.Equal(t, 3, len(files)) 1433 } 1434} 1435 1436func TestNewWalSegmentOnTruncate(t *testing.T) { 1437 h, wlog := newTestHead(t, 1000, false) 1438 defer func() { 1439 require.NoError(t, h.Close()) 1440 }() 1441 add := func(ts int64) { 1442 app := h.Appender(context.Background()) 1443 _, err := app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, ts, 0) 1444 require.NoError(t, err) 1445 require.NoError(t, app.Commit()) 1446 } 1447 1448 add(0) 1449 _, last, err := wal.Segments(wlog.Dir()) 1450 require.NoError(t, err) 1451 require.Equal(t, 0, last) 1452 1453 add(1) 1454 require.NoError(t, h.Truncate(1)) 1455 _, last, err = wal.Segments(wlog.Dir()) 1456 require.NoError(t, err) 1457 require.Equal(t, 1, last) 1458 1459 add(2) 1460 require.NoError(t, h.Truncate(2)) 1461 _, last, err = wal.Segments(wlog.Dir()) 1462 require.NoError(t, err) 1463 require.Equal(t, 2, last) 1464} 1465 1466func TestAddDuplicateLabelName(t *testing.T) { 1467 h, _ := newTestHead(t, 1000, false) 1468 defer func() { 1469 require.NoError(t, h.Close()) 1470 }() 1471 1472 add := func(labels labels.Labels, labelName string) { 1473 app := h.Appender(context.Background()) 1474 _, err := app.Append(0, labels, 0, 0) 1475 require.Error(t, err) 1476 require.Equal(t, fmt.Sprintf(`label name "%s" is not unique: invalid sample`, labelName), err.Error()) 1477 } 1478 1479 add(labels.Labels{{Name: "a", Value: "c"}, {Name: "a", Value: "b"}}, "a") 1480 add(labels.Labels{{Name: "a", Value: "c"}, {Name: "a", Value: "c"}}, "a") 1481 add(labels.Labels{{Name: "__name__", Value: "up"}, {Name: "job", Value: "prometheus"}, {Name: "le", Value: "500"}, {Name: "le", Value: "400"}, {Name: "unit", Value: "s"}}, "le") 1482} 1483 1484func TestMemSeriesIsolation(t *testing.T) { 1485 // Put a series, select it. GC it and then access it. 1486 lastValue := func(h *Head, maxAppendID uint64) int { 1487 idx, err := h.Index() 1488 1489 require.NoError(t, err) 1490 1491 iso := h.iso.State() 1492 iso.maxAppendID = maxAppendID 1493 1494 chunks, err := h.chunksRange(math.MinInt64, math.MaxInt64, iso) 1495 require.NoError(t, err) 1496 // Hm.. here direct block chunk querier might be required? 1497 querier := blockQuerier{ 1498 blockBaseQuerier: &blockBaseQuerier{ 1499 index: idx, 1500 chunks: chunks, 1501 tombstones: tombstones.NewMemTombstones(), 1502 1503 mint: 0, 1504 maxt: 10000, 1505 }, 1506 } 1507 1508 require.NoError(t, err) 1509 defer querier.Close() 1510 1511 ss := querier.Select(false, nil, labels.MustNewMatcher(labels.MatchEqual, "foo", "bar")) 1512 _, seriesSet, ws, err := expandSeriesSet(ss) 1513 require.NoError(t, err) 1514 require.Equal(t, 0, len(ws)) 1515 1516 for _, series := range seriesSet { 1517 return int(series[len(series)-1].v) 1518 } 1519 return -1 1520 } 1521 1522 addSamples := func(h *Head) int { 1523 i := 1 1524 for ; i <= 1000; i++ { 1525 var app storage.Appender 1526 // To initialize bounds. 1527 if h.MinTime() == math.MaxInt64 { 1528 app = &initAppender{head: h} 1529 } else { 1530 a := h.appender() 1531 a.cleanupAppendIDsBelow = 0 1532 app = a 1533 } 1534 1535 _, err := app.Append(0, labels.FromStrings("foo", "bar"), int64(i), float64(i)) 1536 require.NoError(t, err) 1537 require.NoError(t, app.Commit()) 1538 } 1539 return i 1540 } 1541 1542 testIsolation := func(h *Head, i int) { 1543 } 1544 1545 // Test isolation without restart of Head. 1546 hb, _ := newTestHead(t, 1000, false) 1547 i := addSamples(hb) 1548 testIsolation(hb, i) 1549 1550 // Test simple cases in different chunks when no appendID cleanup has been performed. 1551 require.Equal(t, 10, lastValue(hb, 10)) 1552 require.Equal(t, 130, lastValue(hb, 130)) 1553 require.Equal(t, 160, lastValue(hb, 160)) 1554 require.Equal(t, 240, lastValue(hb, 240)) 1555 require.Equal(t, 500, lastValue(hb, 500)) 1556 require.Equal(t, 750, lastValue(hb, 750)) 1557 require.Equal(t, 995, lastValue(hb, 995)) 1558 require.Equal(t, 999, lastValue(hb, 999)) 1559 1560 // Cleanup appendIDs below 500. 1561 app := hb.appender() 1562 app.cleanupAppendIDsBelow = 500 1563 _, err := app.Append(0, labels.FromStrings("foo", "bar"), int64(i), float64(i)) 1564 require.NoError(t, err) 1565 require.NoError(t, app.Commit()) 1566 i++ 1567 1568 // We should not get queries with a maxAppendID below 500 after the cleanup, 1569 // but they only take the remaining appendIDs into account. 1570 require.Equal(t, 499, lastValue(hb, 10)) 1571 require.Equal(t, 499, lastValue(hb, 130)) 1572 require.Equal(t, 499, lastValue(hb, 160)) 1573 require.Equal(t, 499, lastValue(hb, 240)) 1574 require.Equal(t, 500, lastValue(hb, 500)) 1575 require.Equal(t, 995, lastValue(hb, 995)) 1576 require.Equal(t, 999, lastValue(hb, 999)) 1577 1578 // Cleanup appendIDs below 1000, which means the sample buffer is 1579 // the only thing with appendIDs. 1580 app = hb.appender() 1581 app.cleanupAppendIDsBelow = 1000 1582 _, err = app.Append(0, labels.FromStrings("foo", "bar"), int64(i), float64(i)) 1583 require.NoError(t, err) 1584 require.NoError(t, app.Commit()) 1585 require.Equal(t, 999, lastValue(hb, 998)) 1586 require.Equal(t, 999, lastValue(hb, 999)) 1587 require.Equal(t, 1000, lastValue(hb, 1000)) 1588 require.Equal(t, 1001, lastValue(hb, 1001)) 1589 require.Equal(t, 1002, lastValue(hb, 1002)) 1590 require.Equal(t, 1002, lastValue(hb, 1003)) 1591 1592 i++ 1593 // Cleanup appendIDs below 1001, but with a rollback. 1594 app = hb.appender() 1595 app.cleanupAppendIDsBelow = 1001 1596 _, err = app.Append(0, labels.FromStrings("foo", "bar"), int64(i), float64(i)) 1597 require.NoError(t, err) 1598 require.NoError(t, app.Rollback()) 1599 require.Equal(t, 1000, lastValue(hb, 999)) 1600 require.Equal(t, 1000, lastValue(hb, 1000)) 1601 require.Equal(t, 1001, lastValue(hb, 1001)) 1602 require.Equal(t, 1002, lastValue(hb, 1002)) 1603 require.Equal(t, 1002, lastValue(hb, 1003)) 1604 1605 require.NoError(t, hb.Close()) 1606 1607 // Test isolation with restart of Head. This is to verify the num samples of chunks after m-map chunk replay. 1608 hb, w := newTestHead(t, 1000, false) 1609 i = addSamples(hb) 1610 require.NoError(t, hb.Close()) 1611 1612 wlog, err := wal.NewSize(nil, nil, w.Dir(), 32768, false) 1613 require.NoError(t, err) 1614 opts := DefaultHeadOptions() 1615 opts.ChunkRange = 1000 1616 opts.ChunkDirRoot = wlog.Dir() 1617 hb, err = NewHead(nil, nil, wlog, opts, nil) 1618 defer func() { require.NoError(t, hb.Close()) }() 1619 require.NoError(t, err) 1620 require.NoError(t, hb.Init(0)) 1621 1622 // No appends after restarting. Hence all should return the last value. 1623 require.Equal(t, 1000, lastValue(hb, 10)) 1624 require.Equal(t, 1000, lastValue(hb, 130)) 1625 require.Equal(t, 1000, lastValue(hb, 160)) 1626 require.Equal(t, 1000, lastValue(hb, 240)) 1627 require.Equal(t, 1000, lastValue(hb, 500)) 1628 1629 // Cleanup appendIDs below 1000, which means the sample buffer is 1630 // the only thing with appendIDs. 1631 app = hb.appender() 1632 _, err = app.Append(0, labels.FromStrings("foo", "bar"), int64(i), float64(i)) 1633 i++ 1634 require.NoError(t, err) 1635 require.NoError(t, app.Commit()) 1636 require.Equal(t, 1001, lastValue(hb, 998)) 1637 require.Equal(t, 1001, lastValue(hb, 999)) 1638 require.Equal(t, 1001, lastValue(hb, 1000)) 1639 require.Equal(t, 1001, lastValue(hb, 1001)) 1640 require.Equal(t, 1001, lastValue(hb, 1002)) 1641 require.Equal(t, 1001, lastValue(hb, 1003)) 1642 1643 // Cleanup appendIDs below 1002, but with a rollback. 1644 app = hb.appender() 1645 _, err = app.Append(0, labels.FromStrings("foo", "bar"), int64(i), float64(i)) 1646 require.NoError(t, err) 1647 require.NoError(t, app.Rollback()) 1648 require.Equal(t, 1001, lastValue(hb, 999)) 1649 require.Equal(t, 1001, lastValue(hb, 1000)) 1650 require.Equal(t, 1001, lastValue(hb, 1001)) 1651 require.Equal(t, 1001, lastValue(hb, 1002)) 1652 require.Equal(t, 1001, lastValue(hb, 1003)) 1653} 1654 1655func TestIsolationRollback(t *testing.T) { 1656 // Rollback after a failed append and test if the low watermark has progressed anyway. 1657 hb, _ := newTestHead(t, 1000, false) 1658 defer func() { 1659 require.NoError(t, hb.Close()) 1660 }() 1661 1662 app := hb.Appender(context.Background()) 1663 _, err := app.Append(0, labels.FromStrings("foo", "bar"), 0, 0) 1664 require.NoError(t, err) 1665 require.NoError(t, app.Commit()) 1666 require.Equal(t, uint64(1), hb.iso.lowWatermark()) 1667 1668 app = hb.Appender(context.Background()) 1669 _, err = app.Append(0, labels.FromStrings("foo", "bar"), 1, 1) 1670 require.NoError(t, err) 1671 _, err = app.Append(0, labels.FromStrings("foo", "bar", "foo", "baz"), 2, 2) 1672 require.Error(t, err) 1673 require.NoError(t, app.Rollback()) 1674 require.Equal(t, uint64(2), hb.iso.lowWatermark()) 1675 1676 app = hb.Appender(context.Background()) 1677 _, err = app.Append(0, labels.FromStrings("foo", "bar"), 3, 3) 1678 require.NoError(t, err) 1679 require.NoError(t, app.Commit()) 1680 require.Equal(t, uint64(3), hb.iso.lowWatermark(), "Low watermark should proceed to 3 even if append #2 was rolled back.") 1681} 1682 1683func TestIsolationLowWatermarkMonotonous(t *testing.T) { 1684 hb, _ := newTestHead(t, 1000, false) 1685 defer func() { 1686 require.NoError(t, hb.Close()) 1687 }() 1688 1689 app1 := hb.Appender(context.Background()) 1690 _, err := app1.Append(0, labels.FromStrings("foo", "bar"), 0, 0) 1691 require.NoError(t, err) 1692 require.NoError(t, app1.Commit()) 1693 require.Equal(t, uint64(1), hb.iso.lowWatermark(), "Low watermark should by 1 after 1st append.") 1694 1695 app1 = hb.Appender(context.Background()) 1696 _, err = app1.Append(0, labels.FromStrings("foo", "bar"), 1, 1) 1697 require.NoError(t, err) 1698 require.Equal(t, uint64(2), hb.iso.lowWatermark(), "Low watermark should be two, even if append is not committed yet.") 1699 1700 app2 := hb.Appender(context.Background()) 1701 _, err = app2.Append(0, labels.FromStrings("foo", "baz"), 1, 1) 1702 require.NoError(t, err) 1703 require.NoError(t, app2.Commit()) 1704 require.Equal(t, uint64(2), hb.iso.lowWatermark(), "Low watermark should stay two because app1 is not committed yet.") 1705 1706 is := hb.iso.State() 1707 require.Equal(t, uint64(2), hb.iso.lowWatermark(), "After simulated read (iso state retrieved), low watermark should stay at 2.") 1708 1709 require.NoError(t, app1.Commit()) 1710 require.Equal(t, uint64(2), hb.iso.lowWatermark(), "Even after app1 is committed, low watermark should stay at 2 because read is still ongoing.") 1711 1712 is.Close() 1713 require.Equal(t, uint64(3), hb.iso.lowWatermark(), "After read has finished (iso state closed), low watermark should jump to three.") 1714} 1715 1716func TestIsolationAppendIDZeroIsNoop(t *testing.T) { 1717 h, _ := newTestHead(t, 1000, false) 1718 defer func() { 1719 require.NoError(t, h.Close()) 1720 }() 1721 1722 h.initTime(0) 1723 1724 s, _, _ := h.getOrCreate(1, labels.FromStrings("a", "1")) 1725 1726 ok, _ := s.append(0, 0, 0, h.chunkDiskMapper) 1727 require.True(t, ok, "Series append failed.") 1728 require.Equal(t, 0, s.txs.txIDCount, "Series should not have an appendID after append with appendID=0.") 1729} 1730 1731func TestHeadSeriesChunkRace(t *testing.T) { 1732 for i := 0; i < 1000; i++ { 1733 testHeadSeriesChunkRace(t) 1734 } 1735} 1736 1737func TestIsolationWithoutAdd(t *testing.T) { 1738 hb, _ := newTestHead(t, 1000, false) 1739 defer func() { 1740 require.NoError(t, hb.Close()) 1741 }() 1742 1743 app := hb.Appender(context.Background()) 1744 require.NoError(t, app.Commit()) 1745 1746 app = hb.Appender(context.Background()) 1747 _, err := app.Append(0, labels.FromStrings("foo", "baz"), 1, 1) 1748 require.NoError(t, err) 1749 require.NoError(t, app.Commit()) 1750 1751 require.Equal(t, hb.iso.lastAppendID(), hb.iso.lowWatermark(), "High watermark should be equal to the low watermark") 1752} 1753 1754func TestOutOfOrderSamplesMetric(t *testing.T) { 1755 dir, err := ioutil.TempDir("", "test") 1756 require.NoError(t, err) 1757 defer func() { 1758 require.NoError(t, os.RemoveAll(dir)) 1759 }() 1760 1761 db, err := Open(dir, nil, nil, DefaultOptions(), nil) 1762 require.NoError(t, err) 1763 defer func() { 1764 require.NoError(t, db.Close()) 1765 }() 1766 db.DisableCompactions() 1767 1768 ctx := context.Background() 1769 app := db.Appender(ctx) 1770 for i := 1; i <= 5; i++ { 1771 _, err = app.Append(0, labels.FromStrings("a", "b"), int64(i), 99) 1772 require.NoError(t, err) 1773 } 1774 require.NoError(t, app.Commit()) 1775 1776 // Test out of order metric. 1777 require.Equal(t, 0.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)) 1778 app = db.Appender(ctx) 1779 _, err = app.Append(0, labels.FromStrings("a", "b"), 2, 99) 1780 require.Equal(t, storage.ErrOutOfOrderSample, err) 1781 require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)) 1782 1783 _, err = app.Append(0, labels.FromStrings("a", "b"), 3, 99) 1784 require.Equal(t, storage.ErrOutOfOrderSample, err) 1785 require.Equal(t, 2.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)) 1786 1787 _, err = app.Append(0, labels.FromStrings("a", "b"), 4, 99) 1788 require.Equal(t, storage.ErrOutOfOrderSample, err) 1789 require.Equal(t, 3.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)) 1790 require.NoError(t, app.Commit()) 1791 1792 // Compact Head to test out of bound metric. 1793 app = db.Appender(ctx) 1794 _, err = app.Append(0, labels.FromStrings("a", "b"), DefaultBlockDuration*2, 99) 1795 require.NoError(t, err) 1796 require.NoError(t, app.Commit()) 1797 1798 require.Equal(t, int64(math.MinInt64), db.head.minValidTime.Load()) 1799 require.NoError(t, db.Compact()) 1800 require.Greater(t, db.head.minValidTime.Load(), int64(0)) 1801 1802 app = db.Appender(ctx) 1803 _, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()-2, 99) 1804 require.Equal(t, storage.ErrOutOfBounds, err) 1805 require.Equal(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples)) 1806 1807 _, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()-1, 99) 1808 require.Equal(t, storage.ErrOutOfBounds, err) 1809 require.Equal(t, 2.0, prom_testutil.ToFloat64(db.head.metrics.outOfBoundSamples)) 1810 require.NoError(t, app.Commit()) 1811 1812 // Some more valid samples for out of order. 1813 app = db.Appender(ctx) 1814 for i := 1; i <= 5; i++ { 1815 _, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+int64(i), 99) 1816 require.NoError(t, err) 1817 } 1818 require.NoError(t, app.Commit()) 1819 1820 // Test out of order metric. 1821 app = db.Appender(ctx) 1822 _, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+2, 99) 1823 require.Equal(t, storage.ErrOutOfOrderSample, err) 1824 require.Equal(t, 4.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)) 1825 1826 _, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+3, 99) 1827 require.Equal(t, storage.ErrOutOfOrderSample, err) 1828 require.Equal(t, 5.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)) 1829 1830 _, err = app.Append(0, labels.FromStrings("a", "b"), db.head.minValidTime.Load()+DefaultBlockDuration+4, 99) 1831 require.Equal(t, storage.ErrOutOfOrderSample, err) 1832 require.Equal(t, 6.0, prom_testutil.ToFloat64(db.head.metrics.outOfOrderSamples)) 1833 require.NoError(t, app.Commit()) 1834} 1835 1836func testHeadSeriesChunkRace(t *testing.T) { 1837 h, _ := newTestHead(t, 1000, false) 1838 defer func() { 1839 require.NoError(t, h.Close()) 1840 }() 1841 require.NoError(t, h.Init(0)) 1842 app := h.Appender(context.Background()) 1843 1844 s2, err := app.Append(0, labels.FromStrings("foo2", "bar"), 5, 0) 1845 require.NoError(t, err) 1846 for ts := int64(6); ts < 11; ts++ { 1847 _, err = app.Append(s2, nil, ts, 0) 1848 require.NoError(t, err) 1849 } 1850 require.NoError(t, app.Commit()) 1851 1852 var wg sync.WaitGroup 1853 matcher := labels.MustNewMatcher(labels.MatchEqual, "", "") 1854 q, err := NewBlockQuerier(h, 18, 22) 1855 require.NoError(t, err) 1856 defer q.Close() 1857 1858 wg.Add(1) 1859 go func() { 1860 h.updateMinMaxTime(20, 25) 1861 h.gc() 1862 wg.Done() 1863 }() 1864 ss := q.Select(false, nil, matcher) 1865 for ss.Next() { 1866 } 1867 require.NoError(t, ss.Err()) 1868 wg.Wait() 1869} 1870 1871func TestHeadLabelNamesValuesWithMinMaxRange(t *testing.T) { 1872 head, _ := newTestHead(t, 1000, false) 1873 defer func() { 1874 require.NoError(t, head.Close()) 1875 }() 1876 1877 const ( 1878 firstSeriesTimestamp int64 = 100 1879 secondSeriesTimestamp int64 = 200 1880 lastSeriesTimestamp int64 = 300 1881 ) 1882 var ( 1883 seriesTimestamps = []int64{firstSeriesTimestamp, 1884 secondSeriesTimestamp, 1885 lastSeriesTimestamp, 1886 } 1887 expectedLabelNames = []string{"a", "b", "c"} 1888 expectedLabelValues = []string{"d", "e", "f"} 1889 ) 1890 1891 app := head.Appender(context.Background()) 1892 for i, name := range expectedLabelNames { 1893 _, err := app.Append(0, labels.Labels{{Name: name, Value: expectedLabelValues[i]}}, seriesTimestamps[i], 0) 1894 require.NoError(t, err) 1895 } 1896 require.NoError(t, app.Commit()) 1897 require.Equal(t, head.MinTime(), firstSeriesTimestamp) 1898 require.Equal(t, head.MaxTime(), lastSeriesTimestamp) 1899 1900 var testCases = []struct { 1901 name string 1902 mint int64 1903 maxt int64 1904 expectedNames []string 1905 expectedValues []string 1906 }{ 1907 {"maxt less than head min", head.MaxTime() - 10, head.MinTime() - 10, []string{}, []string{}}, 1908 {"mint less than head max", head.MaxTime() + 10, head.MinTime() + 10, []string{}, []string{}}, 1909 {"mint and maxt outside head", head.MaxTime() + 10, head.MinTime() - 10, []string{}, []string{}}, 1910 {"mint and maxt within head", head.MaxTime() - 10, head.MinTime() + 10, expectedLabelNames, expectedLabelValues}, 1911 } 1912 1913 for _, tt := range testCases { 1914 t.Run(tt.name, func(t *testing.T) { 1915 headIdxReader := head.indexRange(tt.mint, tt.maxt) 1916 actualLabelNames, err := headIdxReader.LabelNames() 1917 require.NoError(t, err) 1918 require.Equal(t, tt.expectedNames, actualLabelNames) 1919 if len(tt.expectedValues) > 0 { 1920 for i, name := range expectedLabelNames { 1921 actualLabelValue, err := headIdxReader.SortedLabelValues(name) 1922 require.NoError(t, err) 1923 require.Equal(t, []string{tt.expectedValues[i]}, actualLabelValue) 1924 } 1925 } 1926 }) 1927 } 1928} 1929 1930func TestHeadLabelValuesWithMatchers(t *testing.T) { 1931 head, _ := newTestHead(t, 1000, false) 1932 defer func() { 1933 require.NoError(t, head.Close()) 1934 }() 1935 1936 app := head.Appender(context.Background()) 1937 for i := 0; i < 100; i++ { 1938 _, err := app.Append(0, labels.Labels{ 1939 {Name: "unique", Value: fmt.Sprintf("value%d", i)}, 1940 {Name: "tens", Value: fmt.Sprintf("value%d", i/10)}, 1941 }, 100, 0) 1942 require.NoError(t, err) 1943 } 1944 require.NoError(t, app.Commit()) 1945 1946 var testCases = []struct { 1947 name string 1948 labelName string 1949 matchers []*labels.Matcher 1950 expectedValues []string 1951 }{ 1952 { 1953 name: "get tens based on unique id", 1954 labelName: "tens", 1955 matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "unique", "value35")}, 1956 expectedValues: []string{"value3"}, 1957 }, { 1958 name: "get unique ids based on a ten", 1959 labelName: "unique", 1960 matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "tens", "value1")}, 1961 expectedValues: []string{"value10", "value11", "value12", "value13", "value14", "value15", "value16", "value17", "value18", "value19"}, 1962 }, { 1963 name: "get tens by pattern matching on unique id", 1964 labelName: "tens", 1965 matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchRegexp, "unique", "value[5-7]5")}, 1966 expectedValues: []string{"value5", "value6", "value7"}, 1967 }, { 1968 name: "get tens by matching for absence of unique label", 1969 labelName: "tens", 1970 matchers: []*labels.Matcher{labels.MustNewMatcher(labels.MatchNotEqual, "unique", "")}, 1971 expectedValues: []string{"value0", "value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9"}, 1972 }, 1973 } 1974 1975 for _, tt := range testCases { 1976 t.Run(tt.name, func(t *testing.T) { 1977 headIdxReader := head.indexRange(0, 200) 1978 1979 actualValues, err := headIdxReader.SortedLabelValues(tt.labelName, tt.matchers...) 1980 require.NoError(t, err) 1981 require.Equal(t, tt.expectedValues, actualValues) 1982 1983 actualValues, err = headIdxReader.LabelValues(tt.labelName, tt.matchers...) 1984 sort.Strings(actualValues) 1985 require.NoError(t, err) 1986 require.Equal(t, tt.expectedValues, actualValues) 1987 }) 1988 } 1989} 1990 1991func TestErrReuseAppender(t *testing.T) { 1992 head, _ := newTestHead(t, 1000, false) 1993 defer func() { 1994 require.NoError(t, head.Close()) 1995 }() 1996 1997 app := head.Appender(context.Background()) 1998 _, err := app.Append(0, labels.Labels{{Name: "test", Value: "test"}}, 0, 0) 1999 require.NoError(t, err) 2000 require.NoError(t, app.Commit()) 2001 require.Error(t, app.Commit()) 2002 require.Error(t, app.Rollback()) 2003 2004 app = head.Appender(context.Background()) 2005 _, err = app.Append(0, labels.Labels{{Name: "test", Value: "test"}}, 1, 0) 2006 require.NoError(t, err) 2007 require.NoError(t, app.Rollback()) 2008 require.Error(t, app.Rollback()) 2009 require.Error(t, app.Commit()) 2010 2011 app = head.Appender(context.Background()) 2012 _, err = app.Append(0, labels.Labels{{Name: "test", Value: "test"}}, 2, 0) 2013 require.NoError(t, err) 2014 require.NoError(t, app.Commit()) 2015 require.Error(t, app.Rollback()) 2016 require.Error(t, app.Commit()) 2017 2018 app = head.Appender(context.Background()) 2019 _, err = app.Append(0, labels.Labels{{Name: "test", Value: "test"}}, 3, 0) 2020 require.NoError(t, err) 2021 require.NoError(t, app.Rollback()) 2022 require.Error(t, app.Commit()) 2023 require.Error(t, app.Rollback()) 2024} 2025 2026func TestHeadMintAfterTruncation(t *testing.T) { 2027 chunkRange := int64(2000) 2028 head, _ := newTestHead(t, chunkRange, false) 2029 2030 app := head.Appender(context.Background()) 2031 _, err := app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 100, 100) 2032 require.NoError(t, err) 2033 _, err = app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 4000, 200) 2034 require.NoError(t, err) 2035 _, err = app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 8000, 300) 2036 require.NoError(t, err) 2037 require.NoError(t, app.Commit()) 2038 2039 // Truncating outside the appendable window and actual mint being outside 2040 // appendable window should leave mint at the actual mint. 2041 require.NoError(t, head.Truncate(3500)) 2042 require.Equal(t, int64(4000), head.MinTime()) 2043 require.Equal(t, int64(4000), head.minValidTime.Load()) 2044 2045 // After truncation outside the appendable window if the actual min time 2046 // is in the appendable window then we should leave mint at the start of appendable window. 2047 require.NoError(t, head.Truncate(5000)) 2048 require.Equal(t, head.appendableMinValidTime(), head.MinTime()) 2049 require.Equal(t, head.appendableMinValidTime(), head.minValidTime.Load()) 2050 2051 // If the truncation time is inside the appendable window, then the min time 2052 // should be the truncation time. 2053 require.NoError(t, head.Truncate(7500)) 2054 require.Equal(t, int64(7500), head.MinTime()) 2055 require.Equal(t, int64(7500), head.minValidTime.Load()) 2056 2057 require.NoError(t, head.Close()) 2058} 2059 2060func TestHeadExemplars(t *testing.T) { 2061 chunkRange := int64(2000) 2062 head, _ := newTestHead(t, chunkRange, false) 2063 app := head.Appender(context.Background()) 2064 2065 l := labels.FromStrings("traceId", "123") 2066 // It is perfectly valid to add Exemplars before the current start time - 2067 // histogram buckets that haven't been update in a while could still be 2068 // exported exemplars from an hour ago. 2069 ref, err := app.Append(0, labels.Labels{{Name: "a", Value: "b"}}, 100, 100) 2070 require.NoError(t, err) 2071 _, err = app.AppendExemplar(ref, l, exemplar.Exemplar{ 2072 Labels: l, 2073 HasTs: true, 2074 Ts: -1000, 2075 Value: 1, 2076 }) 2077 require.NoError(t, err) 2078 require.NoError(t, app.Commit()) 2079 require.NoError(t, head.Close()) 2080} 2081 2082func BenchmarkHeadLabelValuesWithMatchers(b *testing.B) { 2083 chunkRange := int64(2000) 2084 head, _ := newTestHead(b, chunkRange, false) 2085 b.Cleanup(func() { require.NoError(b, head.Close()) }) 2086 2087 app := head.Appender(context.Background()) 2088 2089 metricCount := 1000000 2090 for i := 0; i < metricCount; i++ { 2091 _, err := app.Append(0, labels.Labels{ 2092 {Name: "unique", Value: fmt.Sprintf("value%d", i)}, 2093 {Name: "tens", Value: fmt.Sprintf("value%d", i/(metricCount/10))}, 2094 {Name: "ninety", Value: fmt.Sprintf("value%d", i/(metricCount/10)/9)}, // "0" for the first 90%, then "1" 2095 }, 100, 0) 2096 require.NoError(b, err) 2097 } 2098 require.NoError(b, app.Commit()) 2099 2100 headIdxReader := head.indexRange(0, 200) 2101 matchers := []*labels.Matcher{labels.MustNewMatcher(labels.MatchEqual, "ninety", "value0")} 2102 2103 b.ResetTimer() 2104 b.ReportAllocs() 2105 2106 for benchIdx := 0; benchIdx < b.N; benchIdx++ { 2107 actualValues, err := headIdxReader.LabelValues("tens", matchers...) 2108 require.NoError(b, err) 2109 require.Equal(b, 9, len(actualValues)) 2110 } 2111} 2112 2113func TestMemSafeIteratorSeekIntoBuffer(t *testing.T) { 2114 dir, err := ioutil.TempDir("", "iterator_seek") 2115 require.NoError(t, err) 2116 defer func() { 2117 require.NoError(t, os.RemoveAll(dir)) 2118 }() 2119 // This is usually taken from the Head, but passing manually here. 2120 chunkDiskMapper, err := chunks.NewChunkDiskMapper(dir, chunkenc.NewPool(), chunks.DefaultWriteBufferSize) 2121 require.NoError(t, err) 2122 defer func() { 2123 require.NoError(t, chunkDiskMapper.Close()) 2124 }() 2125 2126 s := newMemSeries(labels.Labels{}, 1, 500, nil) 2127 2128 for i := 0; i < 7; i++ { 2129 ok, _ := s.append(int64(i), float64(i), 0, chunkDiskMapper) 2130 require.True(t, ok, "sample append failed") 2131 } 2132 2133 it := s.iterator(s.chunkID(len(s.mmappedChunks)), nil, chunkDiskMapper, nil) 2134 _, ok := it.(*memSafeIterator) 2135 require.True(t, ok) 2136 2137 // First point. 2138 ok = it.Seek(0) 2139 require.True(t, ok) 2140 ts, val := it.At() 2141 require.Equal(t, int64(0), ts) 2142 require.Equal(t, float64(0), val) 2143 2144 // Advance one point. 2145 ok = it.Next() 2146 require.True(t, ok) 2147 ts, val = it.At() 2148 require.Equal(t, int64(1), ts) 2149 require.Equal(t, float64(1), val) 2150 2151 // Seeking an older timestamp shouldn't cause the iterator to go backwards. 2152 ok = it.Seek(0) 2153 require.True(t, ok) 2154 ts, val = it.At() 2155 require.Equal(t, int64(1), ts) 2156 require.Equal(t, float64(1), val) 2157 2158 // Seek into the buffer. 2159 ok = it.Seek(3) 2160 require.True(t, ok) 2161 ts, val = it.At() 2162 require.Equal(t, int64(3), ts) 2163 require.Equal(t, float64(3), val) 2164 2165 // Iterate through the rest of the buffer. 2166 for i := 4; i < 7; i++ { 2167 ok = it.Next() 2168 require.True(t, ok) 2169 ts, val = it.At() 2170 require.Equal(t, int64(i), ts) 2171 require.Equal(t, float64(i), val) 2172 } 2173 2174 // Run out of elements in the iterator. 2175 ok = it.Next() 2176 require.False(t, ok) 2177 ok = it.Seek(7) 2178 require.False(t, ok) 2179} 2180