1package ingester 2 3import ( 4 "fmt" 5 "io/ioutil" 6 "os" 7 "sort" 8 "sync" 9 "syscall" 10 "testing" 11 "time" 12 13 "github.com/cortexproject/cortex/pkg/ring" 14 "github.com/cortexproject/cortex/pkg/tenant" 15 gokitlog "github.com/go-kit/kit/log" 16 "github.com/grafana/dskit/flagext" 17 "github.com/grafana/dskit/kv" 18 "github.com/grafana/dskit/services" 19 "github.com/prometheus/common/model" 20 "github.com/prometheus/prometheus/pkg/labels" 21 "github.com/stretchr/testify/require" 22 "github.com/weaveworks/common/user" 23 "golang.org/x/net/context" 24 25 "github.com/grafana/loki/pkg/chunkenc" 26 "github.com/grafana/loki/pkg/ingester/client" 27 "github.com/grafana/loki/pkg/iter" 28 "github.com/grafana/loki/pkg/logproto" 29 "github.com/grafana/loki/pkg/logql" 30 "github.com/grafana/loki/pkg/logql/log" 31 "github.com/grafana/loki/pkg/runtime" 32 "github.com/grafana/loki/pkg/storage" 33 "github.com/grafana/loki/pkg/storage/chunk" 34 "github.com/grafana/loki/pkg/validation" 35) 36 37const ( 38 numSeries = 10 39 samplesPerSeries = 100 40) 41 42func TestChunkFlushingIdle(t *testing.T) { 43 cfg := defaultIngesterTestConfig(t) 44 cfg.FlushCheckPeriod = 20 * time.Millisecond 45 cfg.MaxChunkIdle = 100 * time.Millisecond 46 cfg.RetainPeriod = 500 * time.Millisecond 47 48 store, ing := newTestStore(t, cfg, nil) 49 defer services.StopAndAwaitTerminated(context.Background(), ing) //nolint:errcheck 50 testData := pushTestSamples(t, ing) 51 52 // wait beyond idle time so samples flush 53 time.Sleep(cfg.MaxChunkIdle * 2) 54 store.checkData(t, testData) 55} 56 57func TestChunkFlushingShutdown(t *testing.T) { 58 store, ing := newTestStore(t, defaultIngesterTestConfig(t), nil) 59 testData := pushTestSamples(t, ing) 60 require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing)) 61 store.checkData(t, testData) 62} 63 64type fullWAL struct{} 65 66func (fullWAL) Log(_ *WALRecord) error { return &os.PathError{Err: syscall.ENOSPC} } 67func (fullWAL) Start() {} 68func (fullWAL) Stop() error { return nil } 69 70func Benchmark_FlushLoop(b *testing.B) { 71 var ( 72 size = 5 73 descs [][]*chunkDesc 74 lbs = makeRandomLabels() 75 ctx = user.InjectOrgID(context.Background(), "foo") 76 _, ing = newTestStore(b, defaultIngesterTestConfig(b), nil) 77 ) 78 79 for i := 0; i < size; i++ { 80 descs = append(descs, buildChunkDecs(b)) 81 } 82 83 b.ResetTimer() 84 b.ReportAllocs() 85 86 for n := 0; n < b.N; n++ { 87 var wg sync.WaitGroup 88 for i := 0; i < size; i++ { 89 wg.Add(1) 90 go func(loop int) { 91 defer wg.Done() 92 require.NoError(b, ing.flushChunks(ctx, 0, lbs, descs[loop], &sync.RWMutex{})) 93 }(i) 94 } 95 wg.Wait() 96 } 97} 98 99func Test_Flush(t *testing.T) { 100 var ( 101 store, ing = newTestStore(t, defaultIngesterTestConfig(t), nil) 102 lbs = makeRandomLabels() 103 ctx = user.InjectOrgID(context.Background(), "foo") 104 ) 105 store.onPut = func(ctx context.Context, chunks []chunk.Chunk) error { 106 for _, c := range chunks { 107 buf, err := c.Encoded() 108 require.Nil(t, err) 109 if err := c.Decode(chunk.NewDecodeContext(), buf); err != nil { 110 return err 111 } 112 } 113 return nil 114 } 115 require.NoError(t, ing.flushChunks(ctx, 0, lbs, buildChunkDecs(t), &sync.RWMutex{})) 116} 117 118func buildChunkDecs(t testing.TB) []*chunkDesc { 119 res := make([]*chunkDesc, 10) 120 for i := range res { 121 res[i] = &chunkDesc{ 122 closed: true, 123 chunk: chunkenc.NewMemChunk(chunkenc.EncSnappy, chunkenc.UnorderedHeadBlockFmt, dummyConf().BlockSize, dummyConf().TargetChunkSize), 124 } 125 fillChunk(t, res[i].chunk) 126 require.NoError(t, res[i].chunk.Close()) 127 } 128 return res 129} 130 131func TestWALFullFlush(t *testing.T) { 132 // technically replaced with a fake wal, but the ingester New() function creates a regular wal first, 133 // so we enable creation/cleanup even though it remains unused. 134 walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal") 135 require.Nil(t, err) 136 defer os.RemoveAll(walDir) 137 138 store, ing := newTestStore(t, defaultIngesterTestConfigWithWAL(t, walDir), fullWAL{}) 139 testData := pushTestSamples(t, ing) 140 require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing)) 141 store.checkData(t, testData) 142} 143 144func TestFlushingCollidingLabels(t *testing.T) { 145 cfg := defaultIngesterTestConfig(t) 146 cfg.FlushCheckPeriod = 20 * time.Millisecond 147 cfg.MaxChunkIdle = 100 * time.Millisecond 148 cfg.RetainPeriod = 500 * time.Millisecond 149 150 store, ing := newTestStore(t, cfg, nil) 151 defer store.Stop() 152 153 const userID = "testUser" 154 ctx := user.InjectOrgID(context.Background(), userID) 155 156 // checkData only iterates between unix seconds 0 and 1000 157 now := time.Unix(0, 0) 158 159 req := &logproto.PushRequest{Streams: []logproto.Stream{ 160 // some colliding label sets 161 {Labels: model.LabelSet{"app": "l", "uniq0": "0", "uniq1": "1"}.String(), Entries: entries(5, now.Add(time.Minute))}, 162 {Labels: model.LabelSet{"app": "m", "uniq0": "1", "uniq1": "1"}.String(), Entries: entries(5, now)}, 163 {Labels: model.LabelSet{"app": "l", "uniq0": "1", "uniq1": "0"}.String(), Entries: entries(5, now.Add(time.Minute))}, 164 {Labels: model.LabelSet{"app": "m", "uniq0": "0", "uniq1": "0"}.String(), Entries: entries(5, now)}, 165 {Labels: model.LabelSet{"app": "l", "uniq0": "0", "uniq1": "0"}.String(), Entries: entries(5, now.Add(time.Minute))}, 166 {Labels: model.LabelSet{"app": "m", "uniq0": "1", "uniq1": "0"}.String(), Entries: entries(5, now)}, 167 }} 168 169 sort.Slice(req.Streams, func(i, j int) bool { 170 return req.Streams[i].Labels < req.Streams[j].Labels 171 }) 172 173 _, err := ing.Push(ctx, req) 174 require.NoError(t, err) 175 176 // force flush 177 require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing)) 178 179 // verify that we get all the data back 180 store.checkData(t, map[string][]logproto.Stream{userID: req.Streams}) 181 182 // make sure all chunks have different fingerprint, even colliding ones. 183 chunkFingerprints := map[model.Fingerprint]bool{} 184 for _, c := range store.getChunksForUser(userID) { 185 require.False(t, chunkFingerprints[c.Fingerprint]) 186 chunkFingerprints[c.Fingerprint] = true 187 } 188} 189 190func TestFlushMaxAge(t *testing.T) { 191 cfg := defaultIngesterTestConfig(t) 192 cfg.FlushCheckPeriod = time.Millisecond * 100 193 cfg.MaxChunkAge = time.Minute 194 cfg.MaxChunkIdle = time.Hour 195 196 store, ing := newTestStore(t, cfg, nil) 197 defer store.Stop() 198 199 now := time.Unix(0, 0) 200 201 firstEntries := []logproto.Entry{ 202 {Timestamp: now.Add(time.Nanosecond), Line: "1"}, 203 {Timestamp: now.Add(time.Minute), Line: "2"}, 204 } 205 206 secondEntries := []logproto.Entry{ 207 {Timestamp: now.Add(time.Second * 61), Line: "3"}, 208 } 209 210 req := &logproto.PushRequest{Streams: []logproto.Stream{ 211 {Labels: model.LabelSet{"app": "l"}.String(), Entries: firstEntries}, 212 }} 213 214 const userID = "testUser" 215 ctx := user.InjectOrgID(context.Background(), userID) 216 217 _, err := ing.Push(ctx, req) 218 require.NoError(t, err) 219 220 time.Sleep(2 * cfg.FlushCheckPeriod) 221 222 // ensure chunk is not flushed after flush period elapses 223 store.checkData(t, map[string][]logproto.Stream{}) 224 225 req2 := &logproto.PushRequest{Streams: []logproto.Stream{ 226 {Labels: model.LabelSet{"app": "l"}.String(), Entries: secondEntries}, 227 }} 228 229 _, err = ing.Push(ctx, req2) 230 require.NoError(t, err) 231 232 time.Sleep(2 * cfg.FlushCheckPeriod) 233 234 // assert stream is now both batches 235 store.checkData(t, map[string][]logproto.Stream{ 236 userID: { 237 {Labels: model.LabelSet{"app": "l"}.String(), Entries: append(firstEntries, secondEntries...)}, 238 }, 239 }) 240 241 require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing)) 242} 243 244type testStore struct { 245 mtx sync.Mutex 246 // Chunks keyed by userID. 247 chunks map[string][]chunk.Chunk 248 onPut func(ctx context.Context, chunks []chunk.Chunk) error 249} 250 251// Note: the ingester New() function creates it's own WAL first which we then override if specified. 252// Because of this, ensure any WAL directories exist/are cleaned up even when overriding the wal. 253// This is an ugly hook for testing :( 254func newTestStore(t require.TestingT, cfg Config, walOverride WAL) (*testStore, *Ingester) { 255 store := &testStore{ 256 chunks: map[string][]chunk.Chunk{}, 257 } 258 259 limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) 260 require.NoError(t, err) 261 262 ing, err := New(cfg, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil) 263 require.NoError(t, err) 264 require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) 265 266 if walOverride != nil { 267 _ = ing.wal.Stop() 268 ing.wal = walOverride 269 } 270 271 return store, ing 272} 273 274// nolint 275func defaultIngesterTestConfig(t testing.TB) Config { 276 kvClient, err := kv.NewClient(kv.Config{Store: "inmemory"}, ring.GetCodec(), nil, gokitlog.NewNopLogger()) 277 require.NoError(t, err) 278 279 cfg := Config{} 280 flagext.DefaultValues(&cfg) 281 cfg.FlushCheckPeriod = 99999 * time.Hour 282 cfg.MaxChunkIdle = 99999 * time.Hour 283 cfg.ConcurrentFlushes = 1 284 cfg.LifecyclerConfig.RingConfig.KVStore.Mock = kvClient 285 cfg.LifecyclerConfig.NumTokens = 1 286 cfg.LifecyclerConfig.ListenPort = 0 287 cfg.LifecyclerConfig.Addr = "localhost" 288 cfg.LifecyclerConfig.ID = "localhost" 289 cfg.LifecyclerConfig.FinalSleep = 0 290 cfg.LifecyclerConfig.MinReadyDuration = 0 291 cfg.BlockSize = 256 * 1024 292 cfg.TargetChunkSize = 1500 * 1024 293 return cfg 294} 295 296func (s *testStore) Put(ctx context.Context, chunks []chunk.Chunk) error { 297 s.mtx.Lock() 298 defer s.mtx.Unlock() 299 if s.onPut != nil { 300 return s.onPut(ctx, chunks) 301 } 302 userID, err := tenant.TenantID(ctx) 303 if err != nil { 304 return err 305 } 306 for ix, chunk := range chunks { 307 for _, label := range chunk.Metric { 308 if label.Value == "" { 309 return fmt.Errorf("Chunk has blank label %q", label.Name) 310 } 311 } 312 313 // remove __name__ label 314 if chunk.Metric.Has("__name__") { 315 labelsBuilder := labels.NewBuilder(chunk.Metric) 316 labelsBuilder.Del("__name__") 317 chunks[ix].Metric = labelsBuilder.Labels() 318 } 319 } 320 s.chunks[userID] = append(s.chunks[userID], chunks...) 321 return nil 322} 323 324func (s *testStore) IsLocal() bool { 325 return false 326} 327 328func (s *testStore) SelectLogs(ctx context.Context, req logql.SelectLogParams) (iter.EntryIterator, error) { 329 return nil, nil 330} 331 332func (s *testStore) SelectSamples(ctx context.Context, req logql.SelectSampleParams) (iter.SampleIterator, error) { 333 return nil, nil 334} 335 336func (s *testStore) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*chunk.Fetcher, error) { 337 return nil, nil, nil 338} 339 340func (s *testStore) GetSchemaConfigs() []chunk.PeriodConfig { 341 return nil 342} 343 344func (s *testStore) Stop() {} 345 346func (s *testStore) SetChunkFilterer(_ storage.RequestChunkFilterer) {} 347 348func pushTestSamples(t *testing.T, ing logproto.PusherServer) map[string][]logproto.Stream { 349 userIDs := []string{"1", "2", "3"} 350 351 // Create test samples. 352 testData := map[string][]logproto.Stream{} 353 for i, userID := range userIDs { 354 testData[userID] = buildTestStreams(i) 355 } 356 357 // Append samples. 358 for _, userID := range userIDs { 359 ctx := user.InjectOrgID(context.Background(), userID) 360 _, err := ing.Push(ctx, &logproto.PushRequest{ 361 Streams: testData[userID], 362 }) 363 require.NoError(t, err) 364 } 365 return testData 366} 367 368func buildTestStreams(offset int) []logproto.Stream { 369 var m []logproto.Stream 370 for i := 0; i < numSeries; i++ { 371 ss := logproto.Stream{ 372 Labels: model.Metric{ 373 "name": model.LabelValue(fmt.Sprintf("testmetric_%d", i)), 374 model.JobLabel: "testjob", 375 }.String(), 376 } 377 for j := 0; j < samplesPerSeries; j++ { 378 ss.Entries = append(ss.Entries, logproto.Entry{ 379 Timestamp: time.Unix(int64(i+j+offset), 0), 380 Line: "line", 381 }) 382 } 383 m = append(m, ss) 384 } 385 386 sort.Slice(m, func(i, j int) bool { 387 return m[i].Labels < m[j].Labels 388 }) 389 390 return m 391} 392 393// check that the store is holding data equivalent to what we expect 394func (s *testStore) checkData(t *testing.T, testData map[string][]logproto.Stream) { 395 for userID, expected := range testData { 396 streams := s.getStreamsForUser(t, userID) 397 require.Equal(t, expected, streams) 398 } 399} 400 401func (s *testStore) getStreamsForUser(t *testing.T, userID string) []logproto.Stream { 402 var streams []logproto.Stream 403 for _, c := range s.getChunksForUser(userID) { 404 lokiChunk := c.Data.(*chunkenc.Facade).LokiChunk() 405 streams = append(streams, buildStreamsFromChunk(t, c.Metric.String(), lokiChunk)) 406 } 407 sort.Slice(streams, func(i, j int) bool { 408 return streams[i].Labels < streams[j].Labels 409 }) 410 return streams 411} 412 413func (s *testStore) getChunksForUser(userID string) []chunk.Chunk { 414 s.mtx.Lock() 415 defer s.mtx.Unlock() 416 417 return s.chunks[userID] 418} 419 420func buildStreamsFromChunk(t *testing.T, lbs string, chk chunkenc.Chunk) logproto.Stream { 421 it, err := chk.Iterator(context.TODO(), time.Unix(0, 0), time.Unix(1000, 0), logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{})) 422 require.NoError(t, err) 423 424 stream := logproto.Stream{ 425 Labels: lbs, 426 } 427 for it.Next() { 428 stream.Entries = append(stream.Entries, it.Entry()) 429 } 430 require.NoError(t, it.Error()) 431 return stream 432} 433