1package ingester
2
3import (
4	"bytes"
5	"context"
6	"fmt"
7	"net/http"
8	"sync"
9	"time"
10
11	util_log "github.com/cortexproject/cortex/pkg/util/log"
12	"github.com/go-kit/kit/log/level"
13	"github.com/pkg/errors"
14	"github.com/prometheus/client_golang/prometheus"
15	"github.com/prometheus/common/model"
16	"github.com/prometheus/prometheus/pkg/labels"
17	"github.com/weaveworks/common/httpgrpc"
18
19	"github.com/grafana/loki/pkg/chunkenc"
20	"github.com/grafana/loki/pkg/iter"
21	"github.com/grafana/loki/pkg/logproto"
22	"github.com/grafana/loki/pkg/logql/log"
23	"github.com/grafana/loki/pkg/logqlmodel/stats"
24	"github.com/grafana/loki/pkg/util/flagext"
25	"github.com/grafana/loki/pkg/validation"
26)
27
28var (
29	chunksCreatedTotal = prometheus.NewCounter(prometheus.CounterOpts{
30		Namespace: "loki",
31		Name:      "ingester_chunks_created_total",
32		Help:      "The total number of chunks created in the ingester.",
33	})
34	samplesPerChunk = prometheus.NewHistogram(prometheus.HistogramOpts{
35		Namespace: "loki",
36		Subsystem: "ingester",
37		Name:      "samples_per_chunk",
38		Help:      "The number of samples in a chunk.",
39
40		Buckets: prometheus.LinearBuckets(4096, 2048, 6),
41	})
42	blocksPerChunk = prometheus.NewHistogram(prometheus.HistogramOpts{
43		Namespace: "loki",
44		Subsystem: "ingester",
45		Name:      "blocks_per_chunk",
46		Help:      "The number of blocks in a chunk.",
47
48		Buckets: prometheus.ExponentialBuckets(5, 2, 6),
49	})
50)
51
52var (
53	ErrEntriesExist = errors.New("duplicate push - entries already exist")
54)
55
56func init() {
57	prometheus.MustRegister(chunksCreatedTotal)
58	prometheus.MustRegister(samplesPerChunk)
59	prometheus.MustRegister(blocksPerChunk)
60}
61
62type line struct {
63	ts      time.Time
64	content string
65}
66
67type stream struct {
68	limiter *StreamRateLimiter
69	cfg     *Config
70	tenant  string
71	// Newest chunk at chunks[n-1].
72	// Not thread-safe; assume accesses to this are locked by caller.
73	chunks   []chunkDesc
74	fp       model.Fingerprint // possibly remapped fingerprint, used in the streams map
75	chunkMtx sync.RWMutex
76
77	labels       labels.Labels
78	labelsString string
79
80	// most recently pushed line. This is used to prevent duplicate pushes.
81	// It also determines chunk synchronization when unordered writes are disabled.
82	lastLine line
83
84	// keeps track of the highest timestamp accepted by the stream.
85	// This is used when unordered writes are enabled to cap the validity window
86	// of accepted writes and for chunk synchronization.
87	highestTs time.Time
88
89	metrics *ingesterMetrics
90
91	tailers   map[uint32]*tailer
92	tailerMtx sync.RWMutex
93
94	// entryCt is a counter which is incremented on each accepted entry.
95	// This allows us to discard WAL entries during replays which were
96	// already recovered via checkpoints. Historically out of order
97	// errors were used to detect this, but this counter has been
98	// introduced to facilitate removing the ordering constraint.
99	entryCt int64
100
101	unorderedWrites bool
102}
103
104type chunkDesc struct {
105	chunk   *chunkenc.MemChunk
106	closed  bool
107	synced  bool
108	flushed time.Time
109
110	lastUpdated time.Time
111}
112
113type entryWithError struct {
114	entry *logproto.Entry
115	e     error
116}
117
118func newStream(cfg *Config, limits RateLimiterStrategy, tenant string, fp model.Fingerprint, labels labels.Labels, unorderedWrites bool, metrics *ingesterMetrics) *stream {
119	return &stream{
120		limiter:         NewStreamRateLimiter(limits, tenant, 10*time.Second),
121		cfg:             cfg,
122		fp:              fp,
123		labels:          labels,
124		labelsString:    labels.String(),
125		tailers:         map[uint32]*tailer{},
126		metrics:         metrics,
127		tenant:          tenant,
128		unorderedWrites: unorderedWrites,
129	}
130}
131
132// consumeChunk manually adds a chunk to the stream that was received during
133// ingester chunk transfer.
134// DEPRECATED: chunk transfers are no longer suggested and remain for compatibility.
135func (s *stream) consumeChunk(_ context.Context, chunk *logproto.Chunk) error {
136	c, err := chunkenc.NewByteChunk(chunk.Data, s.cfg.BlockSize, s.cfg.TargetChunkSize)
137	if err != nil {
138		return err
139	}
140
141	s.chunkMtx.Lock()
142	defer s.chunkMtx.Unlock()
143	s.chunks = append(s.chunks, chunkDesc{
144		chunk: c,
145	})
146	chunksCreatedTotal.Inc()
147	return nil
148}
149
150// setChunks is used during checkpoint recovery
151func (s *stream) setChunks(chunks []Chunk) (bytesAdded, entriesAdded int, err error) {
152	s.chunkMtx.Lock()
153	defer s.chunkMtx.Unlock()
154	chks, err := fromWireChunks(s.cfg, chunks)
155	if err != nil {
156		return 0, 0, err
157	}
158	s.chunks = chks
159	for _, c := range s.chunks {
160		entriesAdded += c.chunk.Size()
161		bytesAdded += c.chunk.UncompressedSize()
162	}
163	return bytesAdded, entriesAdded, nil
164}
165
166func (s *stream) NewChunk() *chunkenc.MemChunk {
167	return chunkenc.NewMemChunk(s.cfg.parsedEncoding, headBlockType(s.unorderedWrites), s.cfg.BlockSize, s.cfg.TargetChunkSize)
168}
169
170func (s *stream) Push(
171	ctx context.Context,
172	entries []logproto.Entry,
173	// WAL record to add push contents to.
174	// May be nil to disable this functionality.
175	record *WALRecord,
176	// Counter used in WAL replay to avoid duplicates.
177	// If this is non-zero, the stream will reject entries
178	// with a counter value less than or equal to it's own.
179	// It is set to zero and thus bypassed outside of WAL replays.
180	counter int64,
181) (int, error) {
182	s.chunkMtx.Lock()
183	defer s.chunkMtx.Unlock()
184
185	if counter > 0 && counter <= s.entryCt {
186		var byteCt int
187		for _, e := range entries {
188			byteCt += len(e.Line)
189		}
190
191		s.metrics.walReplaySamplesDropped.WithLabelValues(duplicateReason).Add(float64(len(entries)))
192		s.metrics.walReplayBytesDropped.WithLabelValues(duplicateReason).Add(float64(byteCt))
193		return 0, ErrEntriesExist
194	}
195
196	var bytesAdded int
197	prevNumChunks := len(s.chunks)
198	if prevNumChunks == 0 {
199		s.chunks = append(s.chunks, chunkDesc{
200			chunk: s.NewChunk(),
201		})
202		chunksCreatedTotal.Inc()
203	}
204
205	var storedEntries []logproto.Entry
206	failedEntriesWithError := []entryWithError{}
207
208	var outOfOrderSamples, outOfOrderBytes int
209	var rateLimitedSamples, rateLimitedBytes int
210	defer func() {
211		if outOfOrderSamples > 0 {
212			validation.DiscardedSamples.WithLabelValues(validation.OutOfOrder, s.tenant).Add(float64(outOfOrderSamples))
213			validation.DiscardedBytes.WithLabelValues(validation.OutOfOrder, s.tenant).Add(float64(outOfOrderBytes))
214		}
215		if rateLimitedSamples > 0 {
216			validation.DiscardedSamples.WithLabelValues(validation.StreamRateLimit, s.tenant).Add(float64(rateLimitedSamples))
217			validation.DiscardedBytes.WithLabelValues(validation.StreamRateLimit, s.tenant).Add(float64(rateLimitedBytes))
218		}
219	}()
220
221	// This call uses a mutex under the hood, cache the result since we're checking the limit
222	// on each entry in the push (hot path) and we only use this value when logging entries
223	// over the rate limit.
224	limit := s.limiter.lim.Limit()
225
226	// Don't fail on the first append error - if samples are sent out of order,
227	// we still want to append the later ones.
228	for i := range entries {
229		// If this entry matches our last appended line's timestamp and contents,
230		// ignore it.
231		//
232		// This check is done at the stream level so it persists across cut and
233		// flushed chunks.
234		//
235		// NOTE: it's still possible for duplicates to be appended if a stream is
236		// deleted from inactivity.
237		if entries[i].Timestamp.Equal(s.lastLine.ts) && entries[i].Line == s.lastLine.content {
238			continue
239		}
240
241		chunk := &s.chunks[len(s.chunks)-1]
242		if chunk.closed || !chunk.chunk.SpaceFor(&entries[i]) || s.cutChunkForSynchronization(entries[i].Timestamp, s.highestTs, chunk, s.cfg.SyncPeriod, s.cfg.SyncMinUtilization) {
243			chunk = s.cutChunk(ctx)
244		}
245		// Check if this this should be rate limited.
246		now := time.Now()
247		if !s.limiter.AllowN(now, len(entries[i].Line)) {
248			failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], &validation.ErrStreamRateLimit{RateLimit: flagext.ByteSize(limit), Labels: s.labelsString, Bytes: flagext.ByteSize(len(entries[i].Line))}})
249			rateLimitedSamples++
250			rateLimitedBytes += len(entries[i].Line)
251			continue
252		}
253
254		// The validity window for unordered writes is the highest timestamp present minus 1/2 * max-chunk-age.
255		if s.unorderedWrites && !s.highestTs.IsZero() && s.highestTs.Add(-s.cfg.MaxChunkAge/2).After(entries[i].Timestamp) {
256			failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], chunkenc.ErrOutOfOrder})
257			outOfOrderSamples++
258			outOfOrderBytes += len(entries[i].Line)
259		} else if err := chunk.chunk.Append(&entries[i]); err != nil {
260			failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], err})
261			if err == chunkenc.ErrOutOfOrder {
262				outOfOrderSamples++
263				outOfOrderBytes += len(entries[i].Line)
264			}
265		} else {
266			storedEntries = append(storedEntries, entries[i])
267			s.lastLine.ts = entries[i].Timestamp
268			s.lastLine.content = entries[i].Line
269			if s.highestTs.Before(entries[i].Timestamp) {
270				s.highestTs = entries[i].Timestamp
271			}
272			s.entryCt++
273
274			// length of string plus
275			bytesAdded += len(entries[i].Line)
276		}
277		chunk.lastUpdated = time.Now()
278	}
279
280	if len(storedEntries) != 0 {
281		// record will be nil when replaying the wal (we don't want to rewrite wal entries as we replay them).
282		if record != nil {
283			record.AddEntries(uint64(s.fp), s.entryCt, storedEntries...)
284		} else {
285			// If record is nil, this is a WAL recovery.
286			s.metrics.recoveredEntriesTotal.Add(float64(len(storedEntries)))
287		}
288
289		s.tailerMtx.RLock()
290		hasTailers := len(s.tailers) != 0
291		s.tailerMtx.RUnlock()
292		if hasTailers {
293			go func() {
294				stream := logproto.Stream{Labels: s.labelsString, Entries: storedEntries}
295
296				closedTailers := []uint32{}
297
298				s.tailerMtx.RLock()
299				for _, tailer := range s.tailers {
300					if tailer.isClosed() {
301						closedTailers = append(closedTailers, tailer.getID())
302						continue
303					}
304					tailer.send(stream, s.labels)
305				}
306				s.tailerMtx.RUnlock()
307
308				if len(closedTailers) != 0 {
309					s.tailerMtx.Lock()
310					defer s.tailerMtx.Unlock()
311
312					for _, closedTailerID := range closedTailers {
313						delete(s.tailers, closedTailerID)
314					}
315				}
316			}()
317		}
318	}
319
320	if len(s.chunks) != prevNumChunks {
321		memoryChunks.Add(float64(len(s.chunks) - prevNumChunks))
322	}
323
324	if len(failedEntriesWithError) > 0 {
325		lastEntryWithErr := failedEntriesWithError[len(failedEntriesWithError)-1]
326		_, ok := lastEntryWithErr.e.(*validation.ErrStreamRateLimit)
327		if lastEntryWithErr.e != chunkenc.ErrOutOfOrder && !ok {
328			return bytesAdded, lastEntryWithErr.e
329		}
330		var statusCode int
331		if lastEntryWithErr.e == chunkenc.ErrOutOfOrder {
332			statusCode = http.StatusBadRequest
333		}
334		if ok {
335			statusCode = http.StatusTooManyRequests
336		}
337		// Return a http status 4xx request response with all failed entries.
338		buf := bytes.Buffer{}
339		streamName := s.labelsString
340
341		limitedFailedEntries := failedEntriesWithError
342		if maxIgnore := s.cfg.MaxReturnedErrors; maxIgnore > 0 && len(limitedFailedEntries) > maxIgnore {
343			limitedFailedEntries = limitedFailedEntries[:maxIgnore]
344		}
345
346		for _, entryWithError := range limitedFailedEntries {
347			fmt.Fprintf(&buf,
348				"entry with timestamp %s ignored, reason: '%s' for stream: %s,\n",
349				entryWithError.entry.Timestamp.String(), entryWithError.e.Error(), streamName)
350		}
351
352		fmt.Fprintf(&buf, "total ignored: %d out of %d", len(failedEntriesWithError), len(entries))
353
354		return bytesAdded, httpgrpc.Errorf(statusCode, buf.String())
355	}
356
357	return bytesAdded, nil
358}
359
360func (s *stream) cutChunk(ctx context.Context) *chunkDesc {
361	// If the chunk has no more space call Close to make sure anything in the head block is cut and compressed
362	chunk := &s.chunks[len(s.chunks)-1]
363	err := chunk.chunk.Close()
364	if err != nil {
365		// This should be an unlikely situation, returning an error up the stack doesn't help much here
366		// so instead log this to help debug the issue if it ever arises.
367		level.Error(util_log.WithContext(ctx, util_log.Logger)).Log("msg", "failed to Close chunk", "err", err)
368	}
369	chunk.closed = true
370
371	samplesPerChunk.Observe(float64(chunk.chunk.Size()))
372	blocksPerChunk.Observe(float64(chunk.chunk.BlockCount()))
373	chunksCreatedTotal.Inc()
374
375	s.chunks = append(s.chunks, chunkDesc{
376		chunk: s.NewChunk(),
377	})
378	return &s.chunks[len(s.chunks)-1]
379}
380
381// Returns true, if chunk should be cut before adding new entry. This is done to make ingesters
382// cut the chunk for this stream at the same moment, so that new chunk will contain exactly the same entries.
383func (s *stream) cutChunkForSynchronization(entryTimestamp, latestTs time.Time, c *chunkDesc, synchronizePeriod time.Duration, minUtilization float64) bool {
384	// Never sync when it's not enabled, it's the first push, or if a write isn't the latest ts
385	// to prevent syncing many unordered writes.
386	if synchronizePeriod <= 0 || latestTs.IsZero() || latestTs.After(entryTimestamp) {
387		return false
388	}
389
390	// we use fingerprint as a jitter here, basically offsetting stream synchronization points to different
391	// this breaks if streams are mapped to different fingerprints on different ingesters, which is too bad.
392	cts := (uint64(entryTimestamp.UnixNano()) + uint64(s.fp)) % uint64(synchronizePeriod.Nanoseconds())
393	pts := (uint64(latestTs.UnixNano()) + uint64(s.fp)) % uint64(synchronizePeriod.Nanoseconds())
394
395	// if current entry timestamp has rolled over synchronization period
396	if cts < pts {
397		if minUtilization <= 0 {
398			c.synced = true
399			return true
400		}
401
402		if c.chunk.Utilization() > minUtilization {
403			c.synced = true
404			return true
405		}
406	}
407
408	return false
409}
410
411func (s *stream) Bounds() (from, to time.Time) {
412	s.chunkMtx.RLock()
413	defer s.chunkMtx.RUnlock()
414	if len(s.chunks) > 0 {
415		from, _ = s.chunks[0].chunk.Bounds()
416		_, to = s.chunks[len(s.chunks)-1].chunk.Bounds()
417	}
418	return from, to
419}
420
421// Returns an iterator.
422func (s *stream) Iterator(ctx context.Context, ingStats *stats.IngesterData, from, through time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error) {
423	s.chunkMtx.RLock()
424	defer s.chunkMtx.RUnlock()
425	iterators := make([]iter.EntryIterator, 0, len(s.chunks))
426
427	var lastMax time.Time
428	ordered := true
429
430	for _, c := range s.chunks {
431		mint, maxt := c.chunk.Bounds()
432
433		// skip this chunk
434		if through.Before(mint) || maxt.Before(from) {
435			continue
436		}
437
438		if mint.Before(lastMax) {
439			ordered = false
440		}
441		lastMax = maxt
442
443		itr, err := c.chunk.Iterator(ctx, from, through, direction, pipeline)
444		if err != nil {
445			return nil, err
446		}
447		if itr != nil {
448			iterators = append(iterators, itr)
449		}
450	}
451
452	if direction != logproto.FORWARD {
453		for left, right := 0, len(iterators)-1; left < right; left, right = left+1, right-1 {
454			iterators[left], iterators[right] = iterators[right], iterators[left]
455		}
456	}
457
458	if ingStats != nil {
459		ingStats.TotalChunksMatched += int64(len(iterators))
460	}
461
462	if ordered {
463		return iter.NewNonOverlappingIterator(iterators, ""), nil
464	}
465	return iter.NewHeapIterator(ctx, iterators, direction), nil
466}
467
468// Returns an SampleIterator.
469func (s *stream) SampleIterator(ctx context.Context, ingStats *stats.IngesterData, from, through time.Time, extractor log.StreamSampleExtractor) (iter.SampleIterator, error) {
470	s.chunkMtx.RLock()
471	defer s.chunkMtx.RUnlock()
472	iterators := make([]iter.SampleIterator, 0, len(s.chunks))
473
474	var lastMax time.Time
475	ordered := true
476
477	for _, c := range s.chunks {
478		mint, maxt := c.chunk.Bounds()
479
480		// skip this chunk
481		if through.Before(mint) || maxt.Before(from) {
482			continue
483		}
484
485		if mint.Before(lastMax) {
486			ordered = false
487		}
488		lastMax = maxt
489
490		if itr := c.chunk.SampleIterator(ctx, from, through, extractor); itr != nil {
491			iterators = append(iterators, itr)
492		}
493	}
494
495	if ingStats != nil {
496		ingStats.TotalChunksMatched += int64(len(iterators))
497	}
498
499	if ordered {
500		return iter.NewNonOverlappingSampleIterator(iterators, ""), nil
501	}
502	return iter.NewHeapSampleIterator(ctx, iterators), nil
503}
504
505func (s *stream) addTailer(t *tailer) {
506	s.tailerMtx.Lock()
507	defer s.tailerMtx.Unlock()
508
509	s.tailers[t.getID()] = t
510}
511
512func (s *stream) resetCounter() {
513	s.entryCt = 0
514}
515
516func headBlockType(unorderedWrites bool) chunkenc.HeadBlockFmt {
517	if unorderedWrites {
518		return chunkenc.UnorderedHeadBlockFmt
519	}
520	return chunkenc.OrderedHeadBlockFmt
521}
522