1package ingester
2
3import (
4	"context"
5	"net/http"
6	"sync"
7	"time"
8
9	"github.com/go-kit/log"
10	"github.com/go-kit/log/level"
11	"github.com/prometheus/client_golang/prometheus"
12	"github.com/prometheus/common/model"
13	"github.com/prometheus/prometheus/pkg/labels"
14	tsdb_record "github.com/prometheus/prometheus/tsdb/record"
15	"github.com/segmentio/fasthash/fnv1a"
16	"github.com/weaveworks/common/httpgrpc"
17
18	"github.com/cortexproject/cortex/pkg/cortexpb"
19	"github.com/cortexproject/cortex/pkg/ingester/client"
20	"github.com/cortexproject/cortex/pkg/ingester/index"
21	"github.com/cortexproject/cortex/pkg/tenant"
22	"github.com/cortexproject/cortex/pkg/util"
23	"github.com/cortexproject/cortex/pkg/util/extract"
24	util_math "github.com/cortexproject/cortex/pkg/util/math"
25	"github.com/cortexproject/cortex/pkg/util/spanlogger"
26	"github.com/cortexproject/cortex/pkg/util/validation"
27)
28
29// userStates holds the userState object for all users (tenants),
30// each one containing all the in-memory series for a given user.
31type userStates struct {
32	states  sync.Map
33	limiter *Limiter
34	cfg     Config
35	metrics *ingesterMetrics
36	logger  log.Logger
37}
38
39type userState struct {
40	limiter             *Limiter
41	userID              string
42	fpLocker            *fingerprintLocker
43	fpToSeries          *seriesMap
44	mapper              *fpMapper
45	index               *index.InvertedIndex
46	ingestedAPISamples  *util_math.EwmaRate
47	ingestedRuleSamples *util_math.EwmaRate
48	activeSeries        *ActiveSeries
49	logger              log.Logger
50
51	seriesInMetric *metricCounter
52
53	// Series metrics.
54	memSeries             prometheus.Gauge
55	memSeriesCreatedTotal prometheus.Counter
56	memSeriesRemovedTotal prometheus.Counter
57	discardedSamples      *prometheus.CounterVec
58	createdChunks         prometheus.Counter
59	activeSeriesGauge     prometheus.Gauge
60}
61
62// DiscardedSamples metric labels
63const (
64	perUserSeriesLimit   = "per_user_series_limit"
65	perMetricSeriesLimit = "per_metric_series_limit"
66)
67
68func newUserStates(limiter *Limiter, cfg Config, metrics *ingesterMetrics, logger log.Logger) *userStates {
69	return &userStates{
70		limiter: limiter,
71		cfg:     cfg,
72		metrics: metrics,
73		logger:  logger,
74	}
75}
76
77func (us *userStates) cp() map[string]*userState {
78	states := map[string]*userState{}
79	us.states.Range(func(key, value interface{}) bool {
80		states[key.(string)] = value.(*userState)
81		return true
82	})
83	return states
84}
85
86//nolint:unused
87func (us *userStates) gc() {
88	us.states.Range(func(key, value interface{}) bool {
89		state := value.(*userState)
90		if state.fpToSeries.length() == 0 {
91			us.states.Delete(key)
92			state.activeSeries.clear()
93			state.activeSeriesGauge.Set(0)
94		}
95		return true
96	})
97}
98
99func (us *userStates) updateRates() {
100	us.states.Range(func(key, value interface{}) bool {
101		state := value.(*userState)
102		state.ingestedAPISamples.Tick()
103		state.ingestedRuleSamples.Tick()
104		return true
105	})
106}
107
108// Labels will be copied if they are kept.
109func (us *userStates) updateActiveSeriesForUser(userID string, now time.Time, lbls []labels.Label) {
110	if s, ok := us.get(userID); ok {
111		s.activeSeries.UpdateSeries(lbls, now, func(l labels.Labels) labels.Labels { return cortexpb.CopyLabels(l) })
112	}
113}
114
115func (us *userStates) purgeAndUpdateActiveSeries(purgeTime time.Time) {
116	us.states.Range(func(key, value interface{}) bool {
117		state := value.(*userState)
118		state.activeSeries.Purge(purgeTime)
119		state.activeSeriesGauge.Set(float64(state.activeSeries.Active()))
120		return true
121	})
122}
123
124func (us *userStates) get(userID string) (*userState, bool) {
125	state, ok := us.states.Load(userID)
126	if !ok {
127		return nil, ok
128	}
129	return state.(*userState), ok
130}
131
132func (us *userStates) getOrCreate(userID string) *userState {
133	state, ok := us.get(userID)
134	if !ok {
135
136		logger := log.With(us.logger, "user", userID)
137		// Speculatively create a userState object and try to store it
138		// in the map.  Another goroutine may have got there before
139		// us, in which case this userState will be discarded
140		state = &userState{
141			userID:              userID,
142			limiter:             us.limiter,
143			fpToSeries:          newSeriesMap(),
144			fpLocker:            newFingerprintLocker(16 * 1024),
145			index:               index.New(),
146			ingestedAPISamples:  util_math.NewEWMARate(0.2, us.cfg.RateUpdatePeriod),
147			ingestedRuleSamples: util_math.NewEWMARate(0.2, us.cfg.RateUpdatePeriod),
148			seriesInMetric:      newMetricCounter(us.limiter, us.cfg.getIgnoreSeriesLimitForMetricNamesMap()),
149			logger:              logger,
150
151			memSeries:             us.metrics.memSeries,
152			memSeriesCreatedTotal: us.metrics.memSeriesCreatedTotal.WithLabelValues(userID),
153			memSeriesRemovedTotal: us.metrics.memSeriesRemovedTotal.WithLabelValues(userID),
154			discardedSamples:      validation.DiscardedSamples.MustCurryWith(prometheus.Labels{"user": userID}),
155			createdChunks:         us.metrics.createdChunks,
156
157			activeSeries:      NewActiveSeries(),
158			activeSeriesGauge: us.metrics.activeSeriesPerUser.WithLabelValues(userID),
159		}
160		state.mapper = newFPMapper(state.fpToSeries, logger)
161		stored, ok := us.states.LoadOrStore(userID, state)
162		if !ok {
163			us.metrics.memUsers.Inc()
164		}
165		state = stored.(*userState)
166	}
167
168	return state
169}
170
171// teardown ensures metrics are accurately updated if a userStates struct is discarded
172func (us *userStates) teardown() {
173	for _, u := range us.cp() {
174		u.memSeriesRemovedTotal.Add(float64(u.fpToSeries.length()))
175		u.memSeries.Sub(float64(u.fpToSeries.length()))
176		u.activeSeriesGauge.Set(0)
177		us.metrics.memUsers.Dec()
178	}
179}
180
181func (us *userStates) getViaContext(ctx context.Context) (*userState, bool, error) {
182	userID, err := tenant.TenantID(ctx)
183	if err != nil {
184		return nil, false, err
185	}
186	state, ok := us.get(userID)
187	return state, ok, nil
188}
189
190// NOTE: memory for `labels` is unsafe; anything retained beyond the
191// life of this function must be copied
192func (us *userStates) getOrCreateSeries(ctx context.Context, userID string, labels []cortexpb.LabelAdapter, record *WALRecord) (*userState, model.Fingerprint, *memorySeries, error) {
193	state := us.getOrCreate(userID)
194	// WARNING: `err` may have a reference to unsafe memory in `labels`
195	fp, series, err := state.getSeries(labels, record)
196	return state, fp, series, err
197}
198
199// NOTE: memory for `metric` is unsafe; anything retained beyond the
200// life of this function must be copied
201func (u *userState) getSeries(metric labelPairs, record *WALRecord) (model.Fingerprint, *memorySeries, error) {
202	rawFP := client.FastFingerprint(metric)
203	u.fpLocker.Lock(rawFP)
204	fp := u.mapper.mapFP(rawFP, metric)
205	if fp != rawFP {
206		u.fpLocker.Unlock(rawFP)
207		u.fpLocker.Lock(fp)
208	}
209
210	series, ok := u.fpToSeries.get(fp)
211	if ok {
212		return fp, series, nil
213	}
214
215	series, err := u.createSeriesWithFingerprint(fp, metric, record, false)
216	if err != nil {
217		u.fpLocker.Unlock(fp)
218		return 0, nil, err
219	}
220
221	return fp, series, nil
222}
223
224func (u *userState) createSeriesWithFingerprint(fp model.Fingerprint, metric labelPairs, record *WALRecord, recovery bool) (*memorySeries, error) {
225	// There's theoretically a relatively harmless race here if multiple
226	// goroutines get the length of the series map at the same time, then
227	// all proceed to add a new series. This is likely not worth addressing,
228	// as this should happen rarely (all samples from one push are added
229	// serially), and the overshoot in allowed series would be minimal.
230
231	if !recovery {
232		if err := u.limiter.AssertMaxSeriesPerUser(u.userID, u.fpToSeries.length()); err != nil {
233			return nil, makeLimitError(perUserSeriesLimit, u.limiter.FormatError(u.userID, err))
234		}
235	}
236
237	// MetricNameFromLabelAdapters returns a copy of the string in `metric`
238	metricName, err := extract.MetricNameFromLabelAdapters(metric)
239	if err != nil {
240		return nil, err
241	}
242
243	if !recovery {
244		// Check if the per-metric limit has been exceeded
245		if err = u.seriesInMetric.canAddSeriesFor(u.userID, metricName); err != nil {
246			// WARNING: returns a reference to `metric`
247			return nil, makeMetricLimitError(perMetricSeriesLimit, cortexpb.FromLabelAdaptersToLabels(metric), u.limiter.FormatError(u.userID, err))
248		}
249	}
250
251	u.memSeriesCreatedTotal.Inc()
252	u.memSeries.Inc()
253	u.seriesInMetric.increaseSeriesForMetric(metricName)
254
255	if record != nil {
256		lbls := make(labels.Labels, 0, len(metric))
257		for _, m := range metric {
258			lbls = append(lbls, labels.Label(m))
259		}
260		record.Series = append(record.Series, tsdb_record.RefSeries{
261			Ref:    uint64(fp),
262			Labels: lbls,
263		})
264	}
265
266	labels := u.index.Add(metric, fp) // Add() returns 'interned' values so the original labels are not retained
267	series := newMemorySeries(labels, u.createdChunks)
268	u.fpToSeries.put(fp, series)
269
270	return series, nil
271}
272
273func (u *userState) removeSeries(fp model.Fingerprint, metric labels.Labels) {
274	u.fpToSeries.del(fp)
275	u.index.Delete(metric, fp)
276
277	metricName := metric.Get(model.MetricNameLabel)
278	if metricName == "" {
279		// Series without a metric name should never be able to make it into
280		// the ingester's memory storage.
281		panic("No metric name label")
282	}
283
284	u.seriesInMetric.decreaseSeriesForMetric(metricName)
285
286	u.memSeriesRemovedTotal.Inc()
287	u.memSeries.Dec()
288}
289
290// forSeriesMatching passes all series matching the given matchers to the
291// provided callback. Deals with locking and the quirks of zero-length matcher
292// values. There are 2 callbacks:
293// - The `add` callback is called for each series while the lock is held, and
294//   is intend to be used by the caller to build a batch.
295// - The `send` callback is called at certain intervals specified by batchSize
296//   with no locks held, and is intended to be used by the caller to send the
297//   built batches.
298func (u *userState) forSeriesMatching(ctx context.Context, allMatchers []*labels.Matcher,
299	add func(context.Context, model.Fingerprint, *memorySeries) error,
300	send func(context.Context) error, batchSize int,
301) error {
302	log, ctx := spanlogger.New(ctx, "forSeriesMatching")
303	defer log.Finish()
304
305	filters, matchers := util.SplitFiltersAndMatchers(allMatchers)
306	fps := u.index.Lookup(matchers)
307	if len(fps) > u.limiter.MaxSeriesPerQuery(u.userID) {
308		return httpgrpc.Errorf(http.StatusRequestEntityTooLarge, "exceeded maximum number of series in a query")
309	}
310
311	level.Debug(log).Log("series", len(fps))
312
313	// We only hold one FP lock at once here, so no opportunity to deadlock.
314	sent := 0
315outer:
316	for _, fp := range fps {
317		if err := ctx.Err(); err != nil {
318			return err
319		}
320
321		u.fpLocker.Lock(fp)
322		series, ok := u.fpToSeries.get(fp)
323		if !ok {
324			u.fpLocker.Unlock(fp)
325			continue
326		}
327
328		for _, filter := range filters {
329			if !filter.Matches(series.metric.Get(filter.Name)) {
330				u.fpLocker.Unlock(fp)
331				continue outer
332			}
333		}
334
335		err := add(ctx, fp, series)
336		u.fpLocker.Unlock(fp)
337		if err != nil {
338			return err
339		}
340
341		sent++
342		if batchSize > 0 && sent%batchSize == 0 && send != nil {
343			if err = send(ctx); err != nil {
344				return nil
345			}
346		}
347	}
348
349	if batchSize > 0 && sent%batchSize > 0 && send != nil {
350		return send(ctx)
351	}
352	return nil
353}
354
355const numMetricCounterShards = 128
356
357type metricCounterShard struct {
358	mtx sync.Mutex
359	m   map[string]int
360}
361
362type metricCounter struct {
363	limiter *Limiter
364	shards  []metricCounterShard
365
366	ignoredMetrics map[string]struct{}
367}
368
369func newMetricCounter(limiter *Limiter, ignoredMetricsForSeriesCount map[string]struct{}) *metricCounter {
370	shards := make([]metricCounterShard, 0, numMetricCounterShards)
371	for i := 0; i < numMetricCounterShards; i++ {
372		shards = append(shards, metricCounterShard{
373			m: map[string]int{},
374		})
375	}
376	return &metricCounter{
377		limiter: limiter,
378		shards:  shards,
379
380		ignoredMetrics: ignoredMetricsForSeriesCount,
381	}
382}
383
384func (m *metricCounter) decreaseSeriesForMetric(metricName string) {
385	shard := m.getShard(metricName)
386	shard.mtx.Lock()
387	defer shard.mtx.Unlock()
388
389	shard.m[metricName]--
390	if shard.m[metricName] == 0 {
391		delete(shard.m, metricName)
392	}
393}
394
395func (m *metricCounter) getShard(metricName string) *metricCounterShard {
396	shard := &m.shards[util.HashFP(model.Fingerprint(fnv1a.HashString64(metricName)))%numMetricCounterShards]
397	return shard
398}
399
400func (m *metricCounter) canAddSeriesFor(userID, metric string) error {
401	if _, ok := m.ignoredMetrics[metric]; ok {
402		return nil
403	}
404
405	shard := m.getShard(metric)
406	shard.mtx.Lock()
407	defer shard.mtx.Unlock()
408
409	return m.limiter.AssertMaxSeriesPerMetric(userID, shard.m[metric])
410}
411
412func (m *metricCounter) increaseSeriesForMetric(metric string) {
413	shard := m.getShard(metric)
414	shard.mtx.Lock()
415	shard.m[metric]++
416	shard.mtx.Unlock()
417}
418