1package ingester 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "net/http" 8 "sync" 9 "time" 10 11 util_log "github.com/cortexproject/cortex/pkg/util/log" 12 "github.com/go-kit/kit/log/level" 13 "github.com/pkg/errors" 14 "github.com/prometheus/client_golang/prometheus" 15 "github.com/prometheus/common/model" 16 "github.com/prometheus/prometheus/pkg/labels" 17 "github.com/weaveworks/common/httpgrpc" 18 19 "github.com/grafana/loki/pkg/chunkenc" 20 "github.com/grafana/loki/pkg/iter" 21 "github.com/grafana/loki/pkg/logproto" 22 "github.com/grafana/loki/pkg/logql/log" 23 "github.com/grafana/loki/pkg/logqlmodel/stats" 24 "github.com/grafana/loki/pkg/util/flagext" 25 "github.com/grafana/loki/pkg/validation" 26) 27 28var ( 29 chunksCreatedTotal = prometheus.NewCounter(prometheus.CounterOpts{ 30 Namespace: "loki", 31 Name: "ingester_chunks_created_total", 32 Help: "The total number of chunks created in the ingester.", 33 }) 34 samplesPerChunk = prometheus.NewHistogram(prometheus.HistogramOpts{ 35 Namespace: "loki", 36 Subsystem: "ingester", 37 Name: "samples_per_chunk", 38 Help: "The number of samples in a chunk.", 39 40 Buckets: prometheus.LinearBuckets(4096, 2048, 6), 41 }) 42 blocksPerChunk = prometheus.NewHistogram(prometheus.HistogramOpts{ 43 Namespace: "loki", 44 Subsystem: "ingester", 45 Name: "blocks_per_chunk", 46 Help: "The number of blocks in a chunk.", 47 48 Buckets: prometheus.ExponentialBuckets(5, 2, 6), 49 }) 50) 51 52var ( 53 ErrEntriesExist = errors.New("duplicate push - entries already exist") 54) 55 56func init() { 57 prometheus.MustRegister(chunksCreatedTotal) 58 prometheus.MustRegister(samplesPerChunk) 59 prometheus.MustRegister(blocksPerChunk) 60} 61 62type line struct { 63 ts time.Time 64 content string 65} 66 67type stream struct { 68 limiter *StreamRateLimiter 69 cfg *Config 70 tenant string 71 // Newest chunk at chunks[n-1]. 72 // Not thread-safe; assume accesses to this are locked by caller. 73 chunks []chunkDesc 74 fp model.Fingerprint // possibly remapped fingerprint, used in the streams map 75 chunkMtx sync.RWMutex 76 77 labels labels.Labels 78 labelsString string 79 80 // most recently pushed line. This is used to prevent duplicate pushes. 81 // It also determines chunk synchronization when unordered writes are disabled. 82 lastLine line 83 84 // keeps track of the highest timestamp accepted by the stream. 85 // This is used when unordered writes are enabled to cap the validity window 86 // of accepted writes and for chunk synchronization. 87 highestTs time.Time 88 89 metrics *ingesterMetrics 90 91 tailers map[uint32]*tailer 92 tailerMtx sync.RWMutex 93 94 // entryCt is a counter which is incremented on each accepted entry. 95 // This allows us to discard WAL entries during replays which were 96 // already recovered via checkpoints. Historically out of order 97 // errors were used to detect this, but this counter has been 98 // introduced to facilitate removing the ordering constraint. 99 entryCt int64 100 101 unorderedWrites bool 102} 103 104type chunkDesc struct { 105 chunk *chunkenc.MemChunk 106 closed bool 107 synced bool 108 flushed time.Time 109 110 lastUpdated time.Time 111} 112 113type entryWithError struct { 114 entry *logproto.Entry 115 e error 116} 117 118func newStream(cfg *Config, limits RateLimiterStrategy, tenant string, fp model.Fingerprint, labels labels.Labels, unorderedWrites bool, metrics *ingesterMetrics) *stream { 119 return &stream{ 120 limiter: NewStreamRateLimiter(limits, tenant, 10*time.Second), 121 cfg: cfg, 122 fp: fp, 123 labels: labels, 124 labelsString: labels.String(), 125 tailers: map[uint32]*tailer{}, 126 metrics: metrics, 127 tenant: tenant, 128 unorderedWrites: unorderedWrites, 129 } 130} 131 132// consumeChunk manually adds a chunk to the stream that was received during 133// ingester chunk transfer. 134// DEPRECATED: chunk transfers are no longer suggested and remain for compatibility. 135func (s *stream) consumeChunk(_ context.Context, chunk *logproto.Chunk) error { 136 c, err := chunkenc.NewByteChunk(chunk.Data, s.cfg.BlockSize, s.cfg.TargetChunkSize) 137 if err != nil { 138 return err 139 } 140 141 s.chunkMtx.Lock() 142 defer s.chunkMtx.Unlock() 143 s.chunks = append(s.chunks, chunkDesc{ 144 chunk: c, 145 }) 146 chunksCreatedTotal.Inc() 147 return nil 148} 149 150// setChunks is used during checkpoint recovery 151func (s *stream) setChunks(chunks []Chunk) (bytesAdded, entriesAdded int, err error) { 152 s.chunkMtx.Lock() 153 defer s.chunkMtx.Unlock() 154 chks, err := fromWireChunks(s.cfg, chunks) 155 if err != nil { 156 return 0, 0, err 157 } 158 s.chunks = chks 159 for _, c := range s.chunks { 160 entriesAdded += c.chunk.Size() 161 bytesAdded += c.chunk.UncompressedSize() 162 } 163 return bytesAdded, entriesAdded, nil 164} 165 166func (s *stream) NewChunk() *chunkenc.MemChunk { 167 return chunkenc.NewMemChunk(s.cfg.parsedEncoding, headBlockType(s.unorderedWrites), s.cfg.BlockSize, s.cfg.TargetChunkSize) 168} 169 170func (s *stream) Push( 171 ctx context.Context, 172 entries []logproto.Entry, 173 // WAL record to add push contents to. 174 // May be nil to disable this functionality. 175 record *WALRecord, 176 // Counter used in WAL replay to avoid duplicates. 177 // If this is non-zero, the stream will reject entries 178 // with a counter value less than or equal to it's own. 179 // It is set to zero and thus bypassed outside of WAL replays. 180 counter int64, 181) (int, error) { 182 s.chunkMtx.Lock() 183 defer s.chunkMtx.Unlock() 184 185 if counter > 0 && counter <= s.entryCt { 186 var byteCt int 187 for _, e := range entries { 188 byteCt += len(e.Line) 189 } 190 191 s.metrics.walReplaySamplesDropped.WithLabelValues(duplicateReason).Add(float64(len(entries))) 192 s.metrics.walReplayBytesDropped.WithLabelValues(duplicateReason).Add(float64(byteCt)) 193 return 0, ErrEntriesExist 194 } 195 196 var bytesAdded int 197 prevNumChunks := len(s.chunks) 198 if prevNumChunks == 0 { 199 s.chunks = append(s.chunks, chunkDesc{ 200 chunk: s.NewChunk(), 201 }) 202 chunksCreatedTotal.Inc() 203 } 204 205 var storedEntries []logproto.Entry 206 failedEntriesWithError := []entryWithError{} 207 208 var outOfOrderSamples, outOfOrderBytes int 209 var rateLimitedSamples, rateLimitedBytes int 210 defer func() { 211 if outOfOrderSamples > 0 { 212 validation.DiscardedSamples.WithLabelValues(validation.OutOfOrder, s.tenant).Add(float64(outOfOrderSamples)) 213 validation.DiscardedBytes.WithLabelValues(validation.OutOfOrder, s.tenant).Add(float64(outOfOrderBytes)) 214 } 215 if rateLimitedSamples > 0 { 216 validation.DiscardedSamples.WithLabelValues(validation.StreamRateLimit, s.tenant).Add(float64(rateLimitedSamples)) 217 validation.DiscardedBytes.WithLabelValues(validation.StreamRateLimit, s.tenant).Add(float64(rateLimitedBytes)) 218 } 219 }() 220 221 // This call uses a mutex under the hood, cache the result since we're checking the limit 222 // on each entry in the push (hot path) and we only use this value when logging entries 223 // over the rate limit. 224 limit := s.limiter.lim.Limit() 225 226 // Don't fail on the first append error - if samples are sent out of order, 227 // we still want to append the later ones. 228 for i := range entries { 229 // If this entry matches our last appended line's timestamp and contents, 230 // ignore it. 231 // 232 // This check is done at the stream level so it persists across cut and 233 // flushed chunks. 234 // 235 // NOTE: it's still possible for duplicates to be appended if a stream is 236 // deleted from inactivity. 237 if entries[i].Timestamp.Equal(s.lastLine.ts) && entries[i].Line == s.lastLine.content { 238 continue 239 } 240 241 chunk := &s.chunks[len(s.chunks)-1] 242 if chunk.closed || !chunk.chunk.SpaceFor(&entries[i]) || s.cutChunkForSynchronization(entries[i].Timestamp, s.highestTs, chunk, s.cfg.SyncPeriod, s.cfg.SyncMinUtilization) { 243 chunk = s.cutChunk(ctx) 244 } 245 // Check if this this should be rate limited. 246 now := time.Now() 247 if !s.limiter.AllowN(now, len(entries[i].Line)) { 248 failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], &validation.ErrStreamRateLimit{RateLimit: flagext.ByteSize(limit), Labels: s.labelsString, Bytes: flagext.ByteSize(len(entries[i].Line))}}) 249 rateLimitedSamples++ 250 rateLimitedBytes += len(entries[i].Line) 251 continue 252 } 253 254 // The validity window for unordered writes is the highest timestamp present minus 1/2 * max-chunk-age. 255 if s.unorderedWrites && !s.highestTs.IsZero() && s.highestTs.Add(-s.cfg.MaxChunkAge/2).After(entries[i].Timestamp) { 256 failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], chunkenc.ErrOutOfOrder}) 257 outOfOrderSamples++ 258 outOfOrderBytes += len(entries[i].Line) 259 } else if err := chunk.chunk.Append(&entries[i]); err != nil { 260 failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], err}) 261 if err == chunkenc.ErrOutOfOrder { 262 outOfOrderSamples++ 263 outOfOrderBytes += len(entries[i].Line) 264 } 265 } else { 266 storedEntries = append(storedEntries, entries[i]) 267 s.lastLine.ts = entries[i].Timestamp 268 s.lastLine.content = entries[i].Line 269 if s.highestTs.Before(entries[i].Timestamp) { 270 s.highestTs = entries[i].Timestamp 271 } 272 s.entryCt++ 273 274 // length of string plus 275 bytesAdded += len(entries[i].Line) 276 } 277 chunk.lastUpdated = time.Now() 278 } 279 280 if len(storedEntries) != 0 { 281 // record will be nil when replaying the wal (we don't want to rewrite wal entries as we replay them). 282 if record != nil { 283 record.AddEntries(uint64(s.fp), s.entryCt, storedEntries...) 284 } else { 285 // If record is nil, this is a WAL recovery. 286 s.metrics.recoveredEntriesTotal.Add(float64(len(storedEntries))) 287 } 288 289 s.tailerMtx.RLock() 290 hasTailers := len(s.tailers) != 0 291 s.tailerMtx.RUnlock() 292 if hasTailers { 293 go func() { 294 stream := logproto.Stream{Labels: s.labelsString, Entries: storedEntries} 295 296 closedTailers := []uint32{} 297 298 s.tailerMtx.RLock() 299 for _, tailer := range s.tailers { 300 if tailer.isClosed() { 301 closedTailers = append(closedTailers, tailer.getID()) 302 continue 303 } 304 tailer.send(stream, s.labels) 305 } 306 s.tailerMtx.RUnlock() 307 308 if len(closedTailers) != 0 { 309 s.tailerMtx.Lock() 310 defer s.tailerMtx.Unlock() 311 312 for _, closedTailerID := range closedTailers { 313 delete(s.tailers, closedTailerID) 314 } 315 } 316 }() 317 } 318 } 319 320 if len(s.chunks) != prevNumChunks { 321 memoryChunks.Add(float64(len(s.chunks) - prevNumChunks)) 322 } 323 324 if len(failedEntriesWithError) > 0 { 325 lastEntryWithErr := failedEntriesWithError[len(failedEntriesWithError)-1] 326 _, ok := lastEntryWithErr.e.(*validation.ErrStreamRateLimit) 327 if lastEntryWithErr.e != chunkenc.ErrOutOfOrder && !ok { 328 return bytesAdded, lastEntryWithErr.e 329 } 330 var statusCode int 331 if lastEntryWithErr.e == chunkenc.ErrOutOfOrder { 332 statusCode = http.StatusBadRequest 333 } 334 if ok { 335 statusCode = http.StatusTooManyRequests 336 } 337 // Return a http status 4xx request response with all failed entries. 338 buf := bytes.Buffer{} 339 streamName := s.labelsString 340 341 limitedFailedEntries := failedEntriesWithError 342 if maxIgnore := s.cfg.MaxReturnedErrors; maxIgnore > 0 && len(limitedFailedEntries) > maxIgnore { 343 limitedFailedEntries = limitedFailedEntries[:maxIgnore] 344 } 345 346 for _, entryWithError := range limitedFailedEntries { 347 fmt.Fprintf(&buf, 348 "entry with timestamp %s ignored, reason: '%s' for stream: %s,\n", 349 entryWithError.entry.Timestamp.String(), entryWithError.e.Error(), streamName) 350 } 351 352 fmt.Fprintf(&buf, "total ignored: %d out of %d", len(failedEntriesWithError), len(entries)) 353 354 return bytesAdded, httpgrpc.Errorf(statusCode, buf.String()) 355 } 356 357 return bytesAdded, nil 358} 359 360func (s *stream) cutChunk(ctx context.Context) *chunkDesc { 361 // If the chunk has no more space call Close to make sure anything in the head block is cut and compressed 362 chunk := &s.chunks[len(s.chunks)-1] 363 err := chunk.chunk.Close() 364 if err != nil { 365 // This should be an unlikely situation, returning an error up the stack doesn't help much here 366 // so instead log this to help debug the issue if it ever arises. 367 level.Error(util_log.WithContext(ctx, util_log.Logger)).Log("msg", "failed to Close chunk", "err", err) 368 } 369 chunk.closed = true 370 371 samplesPerChunk.Observe(float64(chunk.chunk.Size())) 372 blocksPerChunk.Observe(float64(chunk.chunk.BlockCount())) 373 chunksCreatedTotal.Inc() 374 375 s.chunks = append(s.chunks, chunkDesc{ 376 chunk: s.NewChunk(), 377 }) 378 return &s.chunks[len(s.chunks)-1] 379} 380 381// Returns true, if chunk should be cut before adding new entry. This is done to make ingesters 382// cut the chunk for this stream at the same moment, so that new chunk will contain exactly the same entries. 383func (s *stream) cutChunkForSynchronization(entryTimestamp, latestTs time.Time, c *chunkDesc, synchronizePeriod time.Duration, minUtilization float64) bool { 384 // Never sync when it's not enabled, it's the first push, or if a write isn't the latest ts 385 // to prevent syncing many unordered writes. 386 if synchronizePeriod <= 0 || latestTs.IsZero() || latestTs.After(entryTimestamp) { 387 return false 388 } 389 390 // we use fingerprint as a jitter here, basically offsetting stream synchronization points to different 391 // this breaks if streams are mapped to different fingerprints on different ingesters, which is too bad. 392 cts := (uint64(entryTimestamp.UnixNano()) + uint64(s.fp)) % uint64(synchronizePeriod.Nanoseconds()) 393 pts := (uint64(latestTs.UnixNano()) + uint64(s.fp)) % uint64(synchronizePeriod.Nanoseconds()) 394 395 // if current entry timestamp has rolled over synchronization period 396 if cts < pts { 397 if minUtilization <= 0 { 398 c.synced = true 399 return true 400 } 401 402 if c.chunk.Utilization() > minUtilization { 403 c.synced = true 404 return true 405 } 406 } 407 408 return false 409} 410 411func (s *stream) Bounds() (from, to time.Time) { 412 s.chunkMtx.RLock() 413 defer s.chunkMtx.RUnlock() 414 if len(s.chunks) > 0 { 415 from, _ = s.chunks[0].chunk.Bounds() 416 _, to = s.chunks[len(s.chunks)-1].chunk.Bounds() 417 } 418 return from, to 419} 420 421// Returns an iterator. 422func (s *stream) Iterator(ctx context.Context, ingStats *stats.IngesterData, from, through time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error) { 423 s.chunkMtx.RLock() 424 defer s.chunkMtx.RUnlock() 425 iterators := make([]iter.EntryIterator, 0, len(s.chunks)) 426 427 var lastMax time.Time 428 ordered := true 429 430 for _, c := range s.chunks { 431 mint, maxt := c.chunk.Bounds() 432 433 // skip this chunk 434 if through.Before(mint) || maxt.Before(from) { 435 continue 436 } 437 438 if mint.Before(lastMax) { 439 ordered = false 440 } 441 lastMax = maxt 442 443 itr, err := c.chunk.Iterator(ctx, from, through, direction, pipeline) 444 if err != nil { 445 return nil, err 446 } 447 if itr != nil { 448 iterators = append(iterators, itr) 449 } 450 } 451 452 if direction != logproto.FORWARD { 453 for left, right := 0, len(iterators)-1; left < right; left, right = left+1, right-1 { 454 iterators[left], iterators[right] = iterators[right], iterators[left] 455 } 456 } 457 458 if ingStats != nil { 459 ingStats.TotalChunksMatched += int64(len(iterators)) 460 } 461 462 if ordered { 463 return iter.NewNonOverlappingIterator(iterators, ""), nil 464 } 465 return iter.NewHeapIterator(ctx, iterators, direction), nil 466} 467 468// Returns an SampleIterator. 469func (s *stream) SampleIterator(ctx context.Context, ingStats *stats.IngesterData, from, through time.Time, extractor log.StreamSampleExtractor) (iter.SampleIterator, error) { 470 s.chunkMtx.RLock() 471 defer s.chunkMtx.RUnlock() 472 iterators := make([]iter.SampleIterator, 0, len(s.chunks)) 473 474 var lastMax time.Time 475 ordered := true 476 477 for _, c := range s.chunks { 478 mint, maxt := c.chunk.Bounds() 479 480 // skip this chunk 481 if through.Before(mint) || maxt.Before(from) { 482 continue 483 } 484 485 if mint.Before(lastMax) { 486 ordered = false 487 } 488 lastMax = maxt 489 490 if itr := c.chunk.SampleIterator(ctx, from, through, extractor); itr != nil { 491 iterators = append(iterators, itr) 492 } 493 } 494 495 if ingStats != nil { 496 ingStats.TotalChunksMatched += int64(len(iterators)) 497 } 498 499 if ordered { 500 return iter.NewNonOverlappingSampleIterator(iterators, ""), nil 501 } 502 return iter.NewHeapSampleIterator(ctx, iterators), nil 503} 504 505func (s *stream) addTailer(t *tailer) { 506 s.tailerMtx.Lock() 507 defer s.tailerMtx.Unlock() 508 509 s.tailers[t.getID()] = t 510} 511 512func (s *stream) resetCounter() { 513 s.entryCt = 0 514} 515 516func headBlockType(unorderedWrites bool) chunkenc.HeadBlockFmt { 517 if unorderedWrites { 518 return chunkenc.UnorderedHeadBlockFmt 519 } 520 return chunkenc.OrderedHeadBlockFmt 521} 522