1package chunk
2
3import (
4	"context"
5	"errors"
6	"sort"
7	"time"
8
9	"github.com/prometheus/common/model"
10	"github.com/prometheus/prometheus/pkg/labels"
11
12	"github.com/grafana/loki/pkg/storage/chunk/cache"
13)
14
15// StoreLimits helps get Limits specific to Queries for Stores
16type StoreLimits interface {
17	MaxChunksPerQueryFromStore(userID string) int
18	MaxQueryLength(userID string) time.Duration
19}
20
21type CacheGenNumLoader interface {
22	GetStoreCacheGenNumber(tenantIDs []string) string
23}
24
25// Store for chunks.
26type Store interface {
27	Put(ctx context.Context, chunks []Chunk) error
28	PutOne(ctx context.Context, from, through model.Time, chunk Chunk) error
29	Get(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]Chunk, error)
30	// GetChunkRefs returns the un-loaded chunks and the fetchers to be used to load them. You can load each slice of chunks ([]Chunk),
31	// using the corresponding Fetcher (fetchers[i].FetchChunks(ctx, chunks[i], ...)
32	GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]Chunk, []*Fetcher, error)
33	LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string) ([]string, error)
34	LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error)
35	GetChunkFetcher(tm model.Time) *Fetcher
36
37	// DeleteChunk deletes a chunks index entry and then deletes the actual chunk from chunk storage.
38	// It takes care of chunks which are deleting partially by creating and inserting a new chunk first and then deleting the original chunk
39	DeleteChunk(ctx context.Context, from, through model.Time, userID, chunkID string, metric labels.Labels, partiallyDeletedInterval *model.Interval) error
40	// DeleteSeriesIDs is only relevant for SeriesStore.
41	DeleteSeriesIDs(ctx context.Context, from, through model.Time, userID string, metric labels.Labels) error
42	Stop()
43}
44
45// CompositeStore is a Store which delegates to various stores depending
46// on when they were activated.
47type CompositeStore struct {
48	compositeStore
49}
50
51type compositeStore struct {
52	cacheGenNumLoader CacheGenNumLoader
53	stores            []compositeStoreEntry
54}
55
56type compositeStoreEntry struct {
57	start model.Time
58	Store
59}
60
61// NewCompositeStore creates a new Store which delegates to different stores depending
62// on time.
63func NewCompositeStore(cacheGenNumLoader CacheGenNumLoader) CompositeStore {
64	return CompositeStore{compositeStore{cacheGenNumLoader: cacheGenNumLoader}}
65}
66
67// AddPeriod adds the configuration for a period of time to the CompositeStore
68func (c *CompositeStore) AddPeriod(storeCfg StoreConfig, cfg PeriodConfig, index IndexClient, chunks Client, limits StoreLimits, chunksCache, writeDedupeCache cache.Cache) error {
69	schema, err := cfg.CreateSchema()
70	if err != nil {
71		return err
72	}
73
74	return c.addSchema(storeCfg, schema, cfg.From.Time, index, chunks, limits, chunksCache, writeDedupeCache)
75}
76
77func (c *CompositeStore) addSchema(storeCfg StoreConfig, schema BaseSchema, start model.Time, index IndexClient, chunks Client, limits StoreLimits, chunksCache, writeDedupeCache cache.Cache) error {
78	var (
79		err   error
80		store Store
81	)
82
83	switch s := schema.(type) {
84	case SeriesStoreSchema:
85		store, err = newSeriesStore(storeCfg, s, index, chunks, limits, chunksCache, writeDedupeCache)
86	case StoreSchema:
87		store, err = newStore(storeCfg, s, index, chunks, limits, chunksCache)
88	default:
89		err = errors.New("invalid schema type")
90	}
91	if err != nil {
92		return err
93	}
94	c.stores = append(c.stores, compositeStoreEntry{start: start, Store: store})
95	return nil
96}
97
98func (c compositeStore) Put(ctx context.Context, chunks []Chunk) error {
99	for _, chunk := range chunks {
100		err := c.forStores(ctx, chunk.UserID, chunk.From, chunk.Through, func(innerCtx context.Context, from, through model.Time, store Store) error {
101			return store.PutOne(innerCtx, from, through, chunk)
102		})
103		if err != nil {
104			return err
105		}
106	}
107	return nil
108}
109
110func (c compositeStore) PutOne(ctx context.Context, from, through model.Time, chunk Chunk) error {
111	return c.forStores(ctx, chunk.UserID, from, through, func(innerCtx context.Context, from, through model.Time, store Store) error {
112		return store.PutOne(innerCtx, from, through, chunk)
113	})
114}
115
116func (c compositeStore) Get(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]Chunk, error) {
117	var results []Chunk
118	err := c.forStores(ctx, userID, from, through, func(innerCtx context.Context, from, through model.Time, store Store) error {
119		chunks, err := store.Get(innerCtx, userID, from, through, matchers...)
120		if err != nil {
121			return err
122		}
123		results = append(results, chunks...)
124		return nil
125	})
126	return results, err
127}
128
129// LabelValuesForMetricName retrieves all label values for a single label name and metric name.
130func (c compositeStore) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string) ([]string, error) {
131	var result UniqueStrings
132	err := c.forStores(ctx, userID, from, through, func(innerCtx context.Context, from, through model.Time, store Store) error {
133		labelValues, err := store.LabelValuesForMetricName(innerCtx, userID, from, through, metricName, labelName)
134		if err != nil {
135			return err
136		}
137		result.Add(labelValues...)
138		return nil
139	})
140	return result.Strings(), err
141}
142
143// LabelNamesForMetricName retrieves all label names for a metric name.
144func (c compositeStore) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) {
145	var result UniqueStrings
146	err := c.forStores(ctx, userID, from, through, func(innerCtx context.Context, from, through model.Time, store Store) error {
147		labelNames, err := store.LabelNamesForMetricName(innerCtx, userID, from, through, metricName)
148		if err != nil {
149			return err
150		}
151		result.Add(labelNames...)
152		return nil
153	})
154	return result.Strings(), err
155}
156
157func (c compositeStore) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]Chunk, []*Fetcher, error) {
158	chunkIDs := [][]Chunk{}
159	fetchers := []*Fetcher{}
160	err := c.forStores(ctx, userID, from, through, func(innerCtx context.Context, from, through model.Time, store Store) error {
161		ids, fetcher, err := store.GetChunkRefs(innerCtx, userID, from, through, matchers...)
162		if err != nil {
163			return err
164		}
165
166		// Skip it if there are no chunks.
167		if len(ids) == 0 {
168			return nil
169		}
170
171		chunkIDs = append(chunkIDs, ids...)
172		fetchers = append(fetchers, fetcher...)
173		return nil
174	})
175	return chunkIDs, fetchers, err
176}
177
178func (c compositeStore) GetChunkFetcher(tm model.Time) *Fetcher {
179	// find the schema with the lowest start _after_ tm
180	j := sort.Search(len(c.stores), func(j int) bool {
181		return c.stores[j].start > tm
182	})
183
184	// reduce it by 1 because we want a schema with start <= tm
185	j--
186
187	if 0 <= j && j < len(c.stores) {
188		return c.stores[j].GetChunkFetcher(tm)
189	}
190
191	return nil
192}
193
194// DeleteSeriesIDs deletes series IDs from index in series store
195func (c CompositeStore) DeleteSeriesIDs(ctx context.Context, from, through model.Time, userID string, metric labels.Labels) error {
196	return c.forStores(ctx, userID, from, through, func(innerCtx context.Context, from, through model.Time, store Store) error {
197		return store.DeleteSeriesIDs(innerCtx, from, through, userID, metric)
198	})
199}
200
201// DeleteChunk deletes a chunks index entry and then deletes the actual chunk from chunk storage.
202// It takes care of chunks which are deleting partially by creating and inserting a new chunk first and then deleting the original chunk
203func (c CompositeStore) DeleteChunk(ctx context.Context, from, through model.Time, userID, chunkID string, metric labels.Labels, partiallyDeletedInterval *model.Interval) error {
204	return c.forStores(ctx, userID, from, through, func(innerCtx context.Context, from, through model.Time, store Store) error {
205		return store.DeleteChunk(innerCtx, from, through, userID, chunkID, metric, partiallyDeletedInterval)
206	})
207}
208
209func (c compositeStore) Stop() {
210	for _, store := range c.stores {
211		store.Stop()
212	}
213}
214
215func (c compositeStore) forStores(ctx context.Context, userID string, from, through model.Time, callback func(innerCtx context.Context, from, through model.Time, store Store) error) error {
216	if len(c.stores) == 0 {
217		return nil
218	}
219
220	ctx = c.injectCacheGen(ctx, []string{userID})
221
222	// first, find the schema with the highest start _before or at_ from
223	i := sort.Search(len(c.stores), func(i int) bool {
224		return c.stores[i].start > from
225	})
226	if i > 0 {
227		i--
228	} else {
229		// This could happen if we get passed a sample from before 1970.
230		i = 0
231		from = c.stores[0].start
232	}
233
234	// next, find the schema with the lowest start _after_ through
235	j := sort.Search(len(c.stores), func(j int) bool {
236		return c.stores[j].start > through
237	})
238
239	min := func(a, b model.Time) model.Time {
240		if a < b {
241			return a
242		}
243		return b
244	}
245
246	start := from
247	for ; i < j; i++ {
248		nextSchemaStarts := model.Latest
249		if i+1 < len(c.stores) {
250			nextSchemaStarts = c.stores[i+1].start
251		}
252
253		end := min(through, nextSchemaStarts-1)
254		err := callback(ctx, start, end, c.stores[i].Store)
255		if err != nil {
256			return err
257		}
258
259		start = nextSchemaStarts
260	}
261
262	return nil
263}
264
265func (c compositeStore) injectCacheGen(ctx context.Context, tenantIDs []string) context.Context {
266	if c.cacheGenNumLoader == nil {
267		return ctx
268	}
269
270	return cache.InjectCacheGenNumber(ctx, c.cacheGenNumLoader.GetStoreCacheGenNumber(tenantIDs))
271}
272