1// Copyright 2013 The Prometheus Authors 2// Licensed under the Apache License, Version 2.0 (the "License"); 3// you may not use this file except in compliance with the License. 4// You may obtain a copy of the License at 5// 6// http://www.apache.org/licenses/LICENSE-2.0 7// 8// Unless required by applicable law or agreed to in writing, software 9// distributed under the License is distributed on an "AS IS" BASIS, 10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11// See the License for the specific language governing permissions and 12// limitations under the License. 13 14package remote 15 16import ( 17 "context" 18 "fmt" 19 "io/ioutil" 20 "math" 21 "os" 22 "reflect" 23 "sort" 24 "strconv" 25 "sync" 26 "sync/atomic" 27 "testing" 28 "time" 29 30 "github.com/go-kit/kit/log" 31 "github.com/gogo/protobuf/proto" 32 "github.com/golang/snappy" 33 "github.com/stretchr/testify/require" 34 35 client_testutil "github.com/prometheus/client_golang/prometheus/testutil" 36 "github.com/prometheus/common/model" 37 "github.com/prometheus/prometheus/config" 38 "github.com/prometheus/prometheus/pkg/labels" 39 "github.com/prometheus/prometheus/prompb" 40 "github.com/prometheus/prometheus/util/testutil" 41 "github.com/prometheus/tsdb" 42 tsdbLabels "github.com/prometheus/tsdb/labels" 43) 44 45const defaultFlushDeadline = 1 * time.Minute 46 47func TestSampleDelivery(t *testing.T) { 48 // Let's create an even number of send batches so we don't run into the 49 // batch timeout case. 50 n := config.DefaultQueueConfig.Capacity * 2 51 samples, series := createTimeseries(n) 52 53 c := NewTestStorageClient() 54 c.expectSamples(samples[:len(samples)/2], series) 55 56 cfg := config.DefaultQueueConfig 57 cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond) 58 cfg.MaxShards = 1 59 60 dir, err := ioutil.TempDir("", "TestSampleDeliver") 61 testutil.Ok(t, err) 62 defer os.RemoveAll(dir) 63 64 m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) 65 m.StoreSeries(series, 0) 66 67 // These should be received by the client. 68 m.Start() 69 m.Append(samples[:len(samples)/2]) 70 defer m.Stop() 71 72 c.waitForExpectedSamples(t) 73 m.Append(samples[len(samples)/2:]) 74 c.expectSamples(samples[len(samples)/2:], series) 75 c.waitForExpectedSamples(t) 76} 77 78func TestSampleDeliveryTimeout(t *testing.T) { 79 // Let's send one less sample than batch size, and wait the timeout duration 80 n := 9 81 samples, series := createTimeseries(n) 82 c := NewTestStorageClient() 83 84 cfg := config.DefaultQueueConfig 85 cfg.MaxShards = 1 86 cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond) 87 88 dir, err := ioutil.TempDir("", "TestSampleDeliveryTimeout") 89 testutil.Ok(t, err) 90 defer os.RemoveAll(dir) 91 92 m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) 93 m.StoreSeries(series, 0) 94 m.Start() 95 defer m.Stop() 96 97 // Send the samples twice, waiting for the samples in the meantime. 98 c.expectSamples(samples, series) 99 m.Append(samples) 100 c.waitForExpectedSamples(t) 101 102 c.expectSamples(samples, series) 103 m.Append(samples) 104 c.waitForExpectedSamples(t) 105} 106 107func TestSampleDeliveryOrder(t *testing.T) { 108 ts := 10 109 n := config.DefaultQueueConfig.MaxSamplesPerSend * ts 110 samples := make([]tsdb.RefSample, 0, n) 111 series := make([]tsdb.RefSeries, 0, n) 112 for i := 0; i < n; i++ { 113 name := fmt.Sprintf("test_metric_%d", i%ts) 114 samples = append(samples, tsdb.RefSample{ 115 Ref: uint64(i), 116 T: int64(i), 117 V: float64(i), 118 }) 119 series = append(series, tsdb.RefSeries{ 120 Ref: uint64(i), 121 Labels: tsdbLabels.Labels{tsdbLabels.Label{Name: "__name__", Value: name}}, 122 }) 123 } 124 125 c := NewTestStorageClient() 126 c.expectSamples(samples, series) 127 128 dir, err := ioutil.TempDir("", "TestSampleDeliveryOrder") 129 testutil.Ok(t, err) 130 defer os.RemoveAll(dir) 131 132 m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) 133 m.StoreSeries(series, 0) 134 135 m.Start() 136 defer m.Stop() 137 // These should be received by the client. 138 m.Append(samples) 139 c.waitForExpectedSamples(t) 140} 141 142func TestShutdown(t *testing.T) { 143 deadline := 1 * time.Second 144 c := NewTestBlockedStorageClient() 145 146 dir, err := ioutil.TempDir("", "TestShutdown") 147 testutil.Ok(t, err) 148 defer os.RemoveAll(dir) 149 150 m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline) 151 samples, series := createTimeseries(2 * config.DefaultQueueConfig.MaxSamplesPerSend) 152 m.StoreSeries(series, 0) 153 m.Start() 154 155 // Append blocks to guarantee delivery, so we do it in the background. 156 go func() { 157 m.Append(samples) 158 }() 159 time.Sleep(100 * time.Millisecond) 160 161 // Test to ensure that Stop doesn't block. 162 start := time.Now() 163 m.Stop() 164 // The samples will never be delivered, so duration should 165 // be at least equal to deadline, otherwise the flush deadline 166 // was not respected. 167 duration := time.Since(start) 168 if duration > time.Duration(deadline+(deadline/10)) { 169 t.Errorf("Took too long to shutdown: %s > %s", duration, deadline) 170 } 171 if duration < time.Duration(deadline) { 172 t.Errorf("Shutdown occurred before flush deadline: %s < %s", duration, deadline) 173 } 174} 175 176func TestSeriesReset(t *testing.T) { 177 c := NewTestBlockedStorageClient() 178 deadline := 5 * time.Second 179 numSegments := 4 180 numSeries := 25 181 182 dir, err := ioutil.TempDir("", "TestSeriesReset") 183 testutil.Ok(t, err) 184 defer os.RemoveAll(dir) 185 186 m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, deadline) 187 for i := 0; i < numSegments; i++ { 188 series := []tsdb.RefSeries{} 189 for j := 0; j < numSeries; j++ { 190 series = append(series, tsdb.RefSeries{Ref: uint64((i * 100) + j), Labels: tsdbLabels.Labels{{Name: "a", Value: "a"}}}) 191 } 192 m.StoreSeries(series, i) 193 } 194 testutil.Equals(t, numSegments*numSeries, len(m.seriesLabels)) 195 m.SeriesReset(2) 196 testutil.Equals(t, numSegments*numSeries/2, len(m.seriesLabels)) 197} 198 199func TestReshard(t *testing.T) { 200 size := 10 // Make bigger to find more races. 201 n := config.DefaultQueueConfig.Capacity * size 202 samples, series := createTimeseries(n) 203 204 c := NewTestStorageClient() 205 c.expectSamples(samples, series) 206 207 cfg := config.DefaultQueueConfig 208 cfg.MaxShards = 1 209 210 dir, err := ioutil.TempDir("", "TestReshard") 211 testutil.Ok(t, err) 212 defer os.RemoveAll(dir) 213 214 m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) 215 m.StoreSeries(series, 0) 216 217 m.Start() 218 defer m.Stop() 219 220 go func() { 221 for i := 0; i < len(samples); i += config.DefaultQueueConfig.Capacity { 222 sent := m.Append(samples[i : i+config.DefaultQueueConfig.Capacity]) 223 require.True(t, sent) 224 time.Sleep(100 * time.Millisecond) 225 } 226 }() 227 228 for i := 1; i < len(samples)/config.DefaultQueueConfig.Capacity; i++ { 229 m.shards.stop() 230 m.shards.start(i) 231 time.Sleep(100 * time.Millisecond) 232 } 233 234 c.waitForExpectedSamples(t) 235} 236 237func TestReshardRaceWithStop(t *testing.T) { 238 c := NewTestStorageClient() 239 var m *QueueManager 240 h := sync.Mutex{} 241 242 h.Lock() 243 244 go func() { 245 for { 246 m = NewQueueManager(nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) 247 m.Start() 248 h.Unlock() 249 h.Lock() 250 m.Stop() 251 } 252 }() 253 254 for i := 1; i < 100; i++ { 255 h.Lock() 256 m.reshardChan <- i 257 h.Unlock() 258 } 259} 260 261func TestReleaseNoninternedString(t *testing.T) { 262 c := NewTestStorageClient() 263 m := NewQueueManager(nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), config.DefaultQueueConfig, nil, nil, c, defaultFlushDeadline) 264 m.Start() 265 266 for i := 1; i < 1000; i++ { 267 m.StoreSeries([]tsdb.RefSeries{ 268 tsdb.RefSeries{ 269 Ref: uint64(i), 270 Labels: tsdbLabels.Labels{ 271 tsdbLabels.Label{ 272 Name: "asdf", 273 Value: fmt.Sprintf("%d", i), 274 }, 275 }, 276 }, 277 }, 0) 278 m.SeriesReset(1) 279 } 280 281 metric := client_testutil.ToFloat64(noReferenceReleases) 282 testutil.Assert(t, metric == 0, "expected there to be no calls to release for strings that were not already interned: %d", int(metric)) 283} 284 285func createTimeseries(n int) ([]tsdb.RefSample, []tsdb.RefSeries) { 286 samples := make([]tsdb.RefSample, 0, n) 287 series := make([]tsdb.RefSeries, 0, n) 288 for i := 0; i < n; i++ { 289 name := fmt.Sprintf("test_metric_%d", i) 290 samples = append(samples, tsdb.RefSample{ 291 Ref: uint64(i), 292 T: int64(i), 293 V: float64(i), 294 }) 295 series = append(series, tsdb.RefSeries{ 296 Ref: uint64(i), 297 Labels: tsdbLabels.Labels{{Name: "__name__", Value: name}}, 298 }) 299 } 300 return samples, series 301} 302 303func getSeriesNameFromRef(r tsdb.RefSeries) string { 304 for _, l := range r.Labels { 305 if l.Name == "__name__" { 306 return l.Value 307 } 308 } 309 return "" 310} 311 312type TestStorageClient struct { 313 receivedSamples map[string][]prompb.Sample 314 expectedSamples map[string][]prompb.Sample 315 wg sync.WaitGroup 316 mtx sync.Mutex 317 buf []byte 318} 319 320func NewTestStorageClient() *TestStorageClient { 321 return &TestStorageClient{ 322 receivedSamples: map[string][]prompb.Sample{}, 323 expectedSamples: map[string][]prompb.Sample{}, 324 } 325} 326 327func (c *TestStorageClient) expectSamples(ss []tsdb.RefSample, series []tsdb.RefSeries) { 328 c.mtx.Lock() 329 defer c.mtx.Unlock() 330 331 c.expectedSamples = map[string][]prompb.Sample{} 332 c.receivedSamples = map[string][]prompb.Sample{} 333 334 for _, s := range ss { 335 seriesName := getSeriesNameFromRef(series[s.Ref]) 336 c.expectedSamples[seriesName] = append(c.expectedSamples[seriesName], prompb.Sample{ 337 Timestamp: s.T, 338 Value: s.V, 339 }) 340 } 341 c.wg.Add(len(ss)) 342} 343 344func (c *TestStorageClient) waitForExpectedSamples(tb testing.TB) { 345 c.wg.Wait() 346 c.mtx.Lock() 347 defer c.mtx.Unlock() 348 for ts, expectedSamples := range c.expectedSamples { 349 if !reflect.DeepEqual(expectedSamples, c.receivedSamples[ts]) { 350 tb.Fatalf("%s: Expected %v, got %v", ts, expectedSamples, c.receivedSamples[ts]) 351 } 352 } 353} 354 355func (c *TestStorageClient) expectSampleCount(ss []tsdb.RefSample) { 356 c.mtx.Lock() 357 defer c.mtx.Unlock() 358 c.wg.Add(len(ss)) 359} 360 361func (c *TestStorageClient) waitForExpectedSampleCount() { 362 c.wg.Wait() 363} 364 365func (c *TestStorageClient) Store(_ context.Context, req []byte) error { 366 c.mtx.Lock() 367 defer c.mtx.Unlock() 368 // nil buffers are ok for snappy, ignore cast error. 369 if c.buf != nil { 370 c.buf = c.buf[:cap(c.buf)] 371 } 372 reqBuf, err := snappy.Decode(c.buf, req) 373 c.buf = reqBuf 374 if err != nil { 375 return err 376 } 377 378 var reqProto prompb.WriteRequest 379 if err := proto.Unmarshal(reqBuf, &reqProto); err != nil { 380 return err 381 } 382 383 count := 0 384 for _, ts := range reqProto.Timeseries { 385 var seriesName string 386 labels := labelProtosToLabels(ts.Labels) 387 for _, label := range labels { 388 if label.Name == "__name__" { 389 seriesName = label.Value 390 } 391 } 392 for _, sample := range ts.Samples { 393 count++ 394 c.receivedSamples[seriesName] = append(c.receivedSamples[seriesName], sample) 395 } 396 } 397 c.wg.Add(-count) 398 return nil 399} 400 401func (c *TestStorageClient) Name() string { 402 return "teststorageclient" 403} 404 405// TestBlockingStorageClient is a queue_manager StorageClient which will block 406// on any calls to Store(), until the request's Context is cancelled, at which 407// point the `numCalls` property will contain a count of how many times Store() 408// was called. 409type TestBlockingStorageClient struct { 410 numCalls uint64 411} 412 413func NewTestBlockedStorageClient() *TestBlockingStorageClient { 414 return &TestBlockingStorageClient{} 415} 416 417func (c *TestBlockingStorageClient) Store(ctx context.Context, _ []byte) error { 418 atomic.AddUint64(&c.numCalls, 1) 419 <-ctx.Done() 420 return nil 421} 422 423func (c *TestBlockingStorageClient) NumCalls() uint64 { 424 return atomic.LoadUint64(&c.numCalls) 425} 426 427func (c *TestBlockingStorageClient) Name() string { 428 return "testblockingstorageclient" 429} 430 431func BenchmarkSampleDelivery(b *testing.B) { 432 // Let's create an even number of send batches so we don't run into the 433 // batch timeout case. 434 n := config.DefaultQueueConfig.MaxSamplesPerSend * 10 435 samples, series := createTimeseries(n) 436 437 c := NewTestStorageClient() 438 439 cfg := config.DefaultQueueConfig 440 cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond) 441 cfg.MaxShards = 1 442 443 dir, err := ioutil.TempDir("", "BenchmarkSampleDelivery") 444 testutil.Ok(b, err) 445 defer os.RemoveAll(dir) 446 447 m := NewQueueManager(nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) 448 m.StoreSeries(series, 0) 449 450 // These should be received by the client. 451 m.Start() 452 defer m.Stop() 453 454 b.ResetTimer() 455 for i := 0; i < b.N; i++ { 456 c.expectSampleCount(samples) 457 m.Append(samples) 458 c.waitForExpectedSampleCount() 459 } 460 // Do not include shutdown 461 b.StopTimer() 462} 463 464func BenchmarkStartup(b *testing.B) { 465 dir := os.Getenv("WALDIR") 466 if dir == "" { 467 return 468 } 469 470 // Find the second largest segment; we will replay up to this. 471 // (Second largest as WALWatcher will start tailing the largest). 472 dirents, err := ioutil.ReadDir(dir) 473 testutil.Ok(b, err) 474 475 var segments []int 476 for _, dirent := range dirents { 477 if i, err := strconv.Atoi(dirent.Name()); err != nil { 478 segments = append(segments, i) 479 } 480 } 481 sort.Ints(segments) 482 483 logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stdout)) 484 logger = log.With(logger, "caller", log.DefaultCaller) 485 486 for n := 0; n < b.N; n++ { 487 c := NewTestBlockedStorageClient() 488 m := NewQueueManager(logger, dir, 489 newEWMARate(ewmaWeight, shardUpdateDuration), 490 config.DefaultQueueConfig, nil, nil, c, 1*time.Minute) 491 m.watcher.startTime = math.MaxInt64 492 m.watcher.maxSegment = segments[len(segments)-2] 493 err := m.watcher.run() 494 testutil.Ok(b, err) 495 } 496} 497 498func TestProcessExternalLabels(t *testing.T) { 499 for _, tc := range []struct { 500 labels tsdbLabels.Labels 501 externalLabels labels.Labels 502 expected labels.Labels 503 }{ 504 // Test adding labels at the end. 505 { 506 labels: tsdbLabels.Labels{{Name: "a", Value: "b"}}, 507 externalLabels: labels.Labels{{Name: "c", Value: "d"}}, 508 expected: labels.Labels{{Name: "a", Value: "b"}, {Name: "c", Value: "d"}}, 509 }, 510 511 // Test adding labels at the beginning. 512 { 513 labels: tsdbLabels.Labels{{Name: "c", Value: "d"}}, 514 externalLabels: labels.Labels{{Name: "a", Value: "b"}}, 515 expected: labels.Labels{{Name: "a", Value: "b"}, {Name: "c", Value: "d"}}, 516 }, 517 518 // Test we don't override existing labels. 519 { 520 labels: tsdbLabels.Labels{{Name: "a", Value: "b"}}, 521 externalLabels: labels.Labels{{Name: "a", Value: "c"}}, 522 expected: labels.Labels{{Name: "a", Value: "b"}}, 523 }, 524 } { 525 require.Equal(t, tc.expected, processExternalLabels(tc.labels, tc.externalLabels)) 526 } 527} 528