1package storage
2
3import (
4	"context"
5	"fmt"
6	"time"
7
8	util_log "github.com/cortexproject/cortex/pkg/util/log"
9	"github.com/cortexproject/cortex/pkg/util/spanlogger"
10	"github.com/go-kit/kit/log/level"
11	"github.com/prometheus/common/model"
12	"github.com/prometheus/prometheus/pkg/labels"
13
14	"github.com/grafana/loki/pkg/storage/chunk"
15)
16
17type IngesterQuerier interface {
18	GetChunkIDs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]string, error)
19}
20
21// AsyncStore does querying to both ingesters and chunk store and combines the results after deduping them.
22// This should be used when using an async store like boltdb-shipper.
23// AsyncStore is meant to be used only in queriers or any other service other than ingesters.
24// It should never be used in ingesters otherwise it would start spiraling around doing queries over and over again to other ingesters.
25type AsyncStore struct {
26	chunk.Store
27	ingesterQuerier      IngesterQuerier
28	queryIngestersWithin time.Duration
29}
30
31func NewAsyncStore(store chunk.Store, querier IngesterQuerier, queryIngestersWithin time.Duration) *AsyncStore {
32	return &AsyncStore{
33		Store:                store,
34		ingesterQuerier:      querier,
35		queryIngestersWithin: queryIngestersWithin,
36	}
37}
38
39func (a *AsyncStore) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*chunk.Fetcher, error) {
40	spanLogger := spanlogger.FromContext(ctx)
41
42	errs := make(chan error)
43
44	var storeChunks [][]chunk.Chunk
45	var fetchers []*chunk.Fetcher
46	go func() {
47		var err error
48		storeChunks, fetchers, err = a.Store.GetChunkRefs(ctx, userID, from, through, matchers...)
49		errs <- err
50	}()
51
52	var ingesterChunks []string
53
54	go func() {
55		if a.queryIngestersWithin != 0 {
56			// don't query ingesters if the query does not overlap with queryIngestersWithin.
57			if !through.After(model.Now().Add(-a.queryIngestersWithin)) {
58				level.Debug(util_log.Logger).Log("msg", "skipping querying ingesters for chunk ids", "query-from", from, "query-through", through)
59				errs <- nil
60				return
61			}
62		}
63
64		var err error
65		ingesterChunks, err = a.ingesterQuerier.GetChunkIDs(ctx, from, through, matchers...)
66
67		if err == nil {
68			level.Debug(spanLogger).Log("ingester-chunks-count", len(ingesterChunks))
69			level.Debug(util_log.Logger).Log("msg", "got chunk ids from ingester", "count", len(ingesterChunks))
70		}
71		errs <- err
72	}()
73
74	for i := 0; i < 2; i++ {
75		err := <-errs
76		if err != nil {
77			return nil, nil, err
78		}
79	}
80
81	if len(ingesterChunks) == 0 {
82		return storeChunks, fetchers, nil
83	}
84
85	return a.mergeIngesterAndStoreChunks(userID, storeChunks, fetchers, ingesterChunks)
86}
87
88func (a *AsyncStore) mergeIngesterAndStoreChunks(userID string, storeChunks [][]chunk.Chunk, fetchers []*chunk.Fetcher, ingesterChunkIDs []string) ([][]chunk.Chunk, []*chunk.Fetcher, error) {
89	ingesterChunkIDs = filterDuplicateChunks(storeChunks, ingesterChunkIDs)
90	level.Debug(util_log.Logger).Log("msg", "post-filtering ingester chunks", "count", len(ingesterChunkIDs))
91
92	fetcherToChunksGroupIdx := make(map[*chunk.Fetcher]int, len(fetchers))
93
94	for i, fetcher := range fetchers {
95		fetcherToChunksGroupIdx[fetcher] = i
96	}
97
98	for _, chunkID := range ingesterChunkIDs {
99		chk, err := chunk.ParseExternalKey(userID, chunkID)
100		if err != nil {
101			return nil, nil, err
102		}
103
104		// ToDo(Sandeep) possible optimization: Keep the chunk fetcher reference handy after first call since it is expected to stay the same.
105		fetcher := a.Store.GetChunkFetcher(chk.Through)
106		if fetcher == nil {
107			return nil, nil, fmt.Errorf("got a nil fetcher for chunk %s", chk.ExternalKey())
108		}
109
110		if _, ok := fetcherToChunksGroupIdx[fetcher]; !ok {
111			fetchers = append(fetchers, fetcher)
112			storeChunks = append(storeChunks, []chunk.Chunk{})
113			fetcherToChunksGroupIdx[fetcher] = len(fetchers) - 1
114		}
115		chunksGroupIdx := fetcherToChunksGroupIdx[fetcher]
116
117		storeChunks[chunksGroupIdx] = append(storeChunks[chunksGroupIdx], chk)
118	}
119
120	return storeChunks, fetchers, nil
121}
122
123func filterDuplicateChunks(storeChunks [][]chunk.Chunk, ingesterChunkIDs []string) []string {
124	filteredChunkIDs := make([]string, 0, len(ingesterChunkIDs))
125	seen := make(map[string]struct{}, len(storeChunks))
126
127	for i := range storeChunks {
128		for j := range storeChunks[i] {
129			seen[storeChunks[i][j].ExternalKey()] = struct{}{}
130		}
131	}
132
133	for _, chunkID := range ingesterChunkIDs {
134		if _, ok := seen[chunkID]; !ok {
135			filteredChunkIDs = append(filteredChunkIDs, chunkID)
136			seen[chunkID] = struct{}{}
137		}
138	}
139
140	return filteredChunkIDs
141}
142