1package storage 2 3import ( 4 "context" 5 "fmt" 6 "time" 7 8 util_log "github.com/cortexproject/cortex/pkg/util/log" 9 "github.com/cortexproject/cortex/pkg/util/spanlogger" 10 "github.com/go-kit/kit/log/level" 11 "github.com/prometheus/common/model" 12 "github.com/prometheus/prometheus/pkg/labels" 13 14 "github.com/grafana/loki/pkg/storage/chunk" 15) 16 17type IngesterQuerier interface { 18 GetChunkIDs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]string, error) 19} 20 21// AsyncStore does querying to both ingesters and chunk store and combines the results after deduping them. 22// This should be used when using an async store like boltdb-shipper. 23// AsyncStore is meant to be used only in queriers or any other service other than ingesters. 24// It should never be used in ingesters otherwise it would start spiraling around doing queries over and over again to other ingesters. 25type AsyncStore struct { 26 chunk.Store 27 ingesterQuerier IngesterQuerier 28 queryIngestersWithin time.Duration 29} 30 31func NewAsyncStore(store chunk.Store, querier IngesterQuerier, queryIngestersWithin time.Duration) *AsyncStore { 32 return &AsyncStore{ 33 Store: store, 34 ingesterQuerier: querier, 35 queryIngestersWithin: queryIngestersWithin, 36 } 37} 38 39func (a *AsyncStore) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*chunk.Fetcher, error) { 40 spanLogger := spanlogger.FromContext(ctx) 41 42 errs := make(chan error) 43 44 var storeChunks [][]chunk.Chunk 45 var fetchers []*chunk.Fetcher 46 go func() { 47 var err error 48 storeChunks, fetchers, err = a.Store.GetChunkRefs(ctx, userID, from, through, matchers...) 49 errs <- err 50 }() 51 52 var ingesterChunks []string 53 54 go func() { 55 if a.queryIngestersWithin != 0 { 56 // don't query ingesters if the query does not overlap with queryIngestersWithin. 57 if !through.After(model.Now().Add(-a.queryIngestersWithin)) { 58 level.Debug(util_log.Logger).Log("msg", "skipping querying ingesters for chunk ids", "query-from", from, "query-through", through) 59 errs <- nil 60 return 61 } 62 } 63 64 var err error 65 ingesterChunks, err = a.ingesterQuerier.GetChunkIDs(ctx, from, through, matchers...) 66 67 if err == nil { 68 level.Debug(spanLogger).Log("ingester-chunks-count", len(ingesterChunks)) 69 level.Debug(util_log.Logger).Log("msg", "got chunk ids from ingester", "count", len(ingesterChunks)) 70 } 71 errs <- err 72 }() 73 74 for i := 0; i < 2; i++ { 75 err := <-errs 76 if err != nil { 77 return nil, nil, err 78 } 79 } 80 81 if len(ingesterChunks) == 0 { 82 return storeChunks, fetchers, nil 83 } 84 85 return a.mergeIngesterAndStoreChunks(userID, storeChunks, fetchers, ingesterChunks) 86} 87 88func (a *AsyncStore) mergeIngesterAndStoreChunks(userID string, storeChunks [][]chunk.Chunk, fetchers []*chunk.Fetcher, ingesterChunkIDs []string) ([][]chunk.Chunk, []*chunk.Fetcher, error) { 89 ingesterChunkIDs = filterDuplicateChunks(storeChunks, ingesterChunkIDs) 90 level.Debug(util_log.Logger).Log("msg", "post-filtering ingester chunks", "count", len(ingesterChunkIDs)) 91 92 fetcherToChunksGroupIdx := make(map[*chunk.Fetcher]int, len(fetchers)) 93 94 for i, fetcher := range fetchers { 95 fetcherToChunksGroupIdx[fetcher] = i 96 } 97 98 for _, chunkID := range ingesterChunkIDs { 99 chk, err := chunk.ParseExternalKey(userID, chunkID) 100 if err != nil { 101 return nil, nil, err 102 } 103 104 // ToDo(Sandeep) possible optimization: Keep the chunk fetcher reference handy after first call since it is expected to stay the same. 105 fetcher := a.Store.GetChunkFetcher(chk.Through) 106 if fetcher == nil { 107 return nil, nil, fmt.Errorf("got a nil fetcher for chunk %s", chk.ExternalKey()) 108 } 109 110 if _, ok := fetcherToChunksGroupIdx[fetcher]; !ok { 111 fetchers = append(fetchers, fetcher) 112 storeChunks = append(storeChunks, []chunk.Chunk{}) 113 fetcherToChunksGroupIdx[fetcher] = len(fetchers) - 1 114 } 115 chunksGroupIdx := fetcherToChunksGroupIdx[fetcher] 116 117 storeChunks[chunksGroupIdx] = append(storeChunks[chunksGroupIdx], chk) 118 } 119 120 return storeChunks, fetchers, nil 121} 122 123func filterDuplicateChunks(storeChunks [][]chunk.Chunk, ingesterChunkIDs []string) []string { 124 filteredChunkIDs := make([]string, 0, len(ingesterChunkIDs)) 125 seen := make(map[string]struct{}, len(storeChunks)) 126 127 for i := range storeChunks { 128 for j := range storeChunks[i] { 129 seen[storeChunks[i][j].ExternalKey()] = struct{}{} 130 } 131 } 132 133 for _, chunkID := range ingesterChunkIDs { 134 if _, ok := seen[chunkID]; !ok { 135 filteredChunkIDs = append(filteredChunkIDs, chunkID) 136 seen[chunkID] = struct{}{} 137 } 138 } 139 140 return filteredChunkIDs 141} 142